Getting Started

Scenario

This section describes capabilities of Hudi using spark-shell. Using the Spark data source, this section describes how to insert and update a Hudi dataset of the default storage mode Copy-on Write (COW) tables based on code snippets. After each write operation, you will be introduced how to read snapshot and incremental data.

Prerequisites

  • You have created a user and added the user to user groups hadoop (primary group) and hive on Manager.

Procedure

  1. Download and install the Hudi client. For details, see Installing a Client (Version 3.x or Later).

    Note

    Currently, Hudi is integrated in Spark2x. You only need to download the Spark2x client on Manager. For example, the client installation directory is /opt/client.

  2. Log in to the node where the client is installed as user root and run the following command:

    cd /opt/client

  3. Run the following commands to load environment variables:

    source bigdata_env

    source Hudi/component_env

    kinit Created user

    Note

    • You need to change the password of the created user, and then run the kinit command to log in to the system again.

    • In normal mode (Kerberos authentication disabled), you do not need to run the kinit command.

  4. Use spark-shell --master yarn-client to import Hudi packages to generate test data:

    // Import required packages.
    import org.apache.hudi.QuickstartUtils._
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode._
    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    // Define the table name and storage path to generate test data.
    val tableName = "hudi_cow_table"
    val basePath = "hdfs://hacluster/tmp/hudi_cow_table"
    val dataGen = new DataGenerator
    val inserts = convertToStringList(dataGen.generateInserts(10))
    val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    
  5. Write data to the Hudi table in overwrite mode.

    df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
    option(TABLE_NAME, tableName).
    mode(Overwrite).
    save(basePath)
    
  6. Query the Hudi table.

    Register a temporary table and query the table.

    val roViewDF = spark.
    read.
    format("org.apache.hudi").
    load(basePath + "/*/*/*/*")
    roViewDF.createOrReplaceTempView("hudi_ro_table")
    spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_ro_table where fare > 20.0").show()
    
  7. Generate new data and update the Hudi table in append mode.

    val updates = convertToStringList(dataGen.generateUpdates(10))
    val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))
    df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
    option(TABLE_NAME, tableName).
    mode(Append).
    save(basePath)
    
  8. Query incremental data in the Hudi table.

    • Reload data.

      spark.
      read.
      format("org.apache.hudi").
      load(basePath + "/*/*/*/*").
      createOrReplaceTempView("hudi_ro_table")
      
    • Perform the incremental query.

      val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)
      val beginTime = commits(commits.length - 2)
      val incViewDF = spark.
      read.
      format("org.apache.hudi").
      option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
      option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
      load(basePath);
      incViewDF.registerTempTable("hudi_incr_table")
      spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_incr_table where fare > 20.0").show()
      
  9. Perform the point-in-time query.

    val beginTime = "000"
    val endTime = commits(commits.length - 2)
    val incViewDF = spark.read.format("org.apache.hudi").
    option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
    option(END_INSTANTTIME_OPT_KEY, endTime).
    load(basePath);
    incViewDF.registerTempTable("hudi_incr_table")
    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_incr_table where fare > 20.0").show()
    
  10. Delete data.

    • Prepare the data to be deleted.

      val df = spark.sql("select uuid, partitionpath from hudi_ro_table limit 2")
      val deletes = dataGen.generateDeletes(df.collectAsList())
      
    • Execute the deletion.

      val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
      df.write.format("org.apache.hudi").
      options(getQuickstartWriteConfigs).
      option(OPERATION_OPT_KEY,"delete").
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Append).
      save(basePath);
      
    • Query data again.

      val roViewDFAfterDelete = spark.
      read.
      format("org.apache.hudi").
      load(basePath + "/*/*/*/*")
      roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")
      spark.sql("select uuid, partitionPath from hudi_ro_table").show()