解锁Apache Hudi删除记录新姿势

1,738 阅读3分钟

欢迎关注微信公众号:ApacheHudi

1. 引入

在0.5.1版本之前,用户若想删除某条记录,可以使用Spark DataSource,并将DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY设置为EmptyHoodieRecordPayload.class.getName,便可删除指定记录,在Hudi新发布的0.5.1版本,可不使用上述配置项删除记录,而提供三种方式删除记录:Hudi APISpark DataSourceDeltaStreamer,下面逐一介绍如何使用。

2. 步骤

2.1 使用Hudi API

如果应用程序中已经内嵌了HoodieWriteClient,可以直接使用HoodieWriteClient如下API删除记录

/**
   * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be
   * deduped and non existant keys will be removed before deleting.
   *
   * @param keys {@link List} of {@link HoodieKey}s to be deleted
   * @param commitTime Commit time handle
   * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
   */
  public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String commitTime);

2.2 使用DataSource

介绍如何使用Datasource API对示例数据集执行删除的示例。与快速入门中的示例相同。

1 启动spark-shell
bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
2 导入必要的Import
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
 
val tableName = "hudi_cow_table"
val basePath = "file:///tmp/hudi_cow_table"
val dataGen = new DataGenerator
3 插入数据
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
    option(TABLE_NAME, tableName).
    mode(Overwrite).
    save(basePath);
4 查询数据
val roViewDF = spark.
    read.
    format("org.apache.hudi").
    load(basePath + "/*/*/*/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select count(*) from hudi_ro_table").show() // should return 10 (number of records inserted above)
val riderValue = spark.sql("select distinct rider from hudi_ro_table").show()
// copy the value displayed to be used in next step
5 准备待删除数据集

首先通过查询准备好待删除的数据集

val df = spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'")
6. 删除数据
val deletes = dataGen.generateDeletes(df.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath);
7. 验证

重新加载表记录,验证记录是否被删除

val roViewDFAfterDelete = spark.
    read.
    format("org.apache.hudi").
    load(basePath + "/*/*/*/*")
roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")
spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'").show() // should not return any rows

2.3 使用DeltaStreamer

使用HoodieDeltaStreamer进行删除与upsert相同,它依赖每个记录中名为“hoodie_is_deleted”的boolean类型的特定字段。

  • 如果记录的字段值设置为false或不存在,则将其视为常规upsert。
  • 如果不是(如果该值设置为true),则将其视为已删除记录。

这意味着必须更改数据源的schema来添加此字段,并且所有传入记录都应设置此字段值,在未来的版本中我们将尽量放开这点。

如原始数据源的schema如下。

{
  "type":"record",
  "name":"example_tbl",
  "fields":[{
     "name": "uuid",
     "type": "String"
  }, {
     "name": "ts",
     "type": "string"
  },  {
     "name": "partitionPath",
     "type": "string"
  }, {
     "name": "rank",
     "type": "long"
  }
]}

那么要利用DeltaStreamer的删除功能,必须更改schema如下。

{
  "type":"record",
  "name":"example_tbl",
  "fields":[{
     "name": "uuid",
     "type": "String"
  }, {
     "name": "ts",
     "type": "string"
  },  {
     "name": "partitionPath",
     "type": "string"
  }, {
     "name": "rank",
     "type": "long"
  }, {
    "name" : "_hoodie_is_deleted",
    "type" : "boolean",
    "default" : false
  }
]}

upsert传入记录示例数据如下

{"ts": 0.0, "uuid": "69cdb048-c93e-4532-adf9-f61ce6afe605", "rank": 1034, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : false}

delete传入记录示例数据如下

{"ts": 0.0, "uuid": "19tdb048-c93e-4532-adf9-f61ce6afe10", "rank": 1045, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : true}

只需要进行一次性的变更,DeltasDreamer将处理每批中的upsert和delete,并且每一批都可以包含upsert和deletes的混合,之后不需要额外的步骤或更改。

3. 总结

在Hudi 0.5.1-incubating版本中引入了额外三种删除记录的能力,用户可使用上述任意一种方案来达到删除记录的目的。