Apache Hudi与Delta Lake对比

1. 引入

在类Hadoop系统上支持ACID有了更大的吸引力,其中Databricks的Delta Lake和Uber开源的Hudi也成为了主要贡献者和竞争对手。两者都通过在“parquet”文件格式中提供不同的抽象以解决主要问题;很难选择一个比另一个更好。此博客将使用一个非常基本的示例来了解这些工具的工作原理,并让读者来比较两者的优缺点。

我们将使用与本系列下一篇文章中相反的方法,后面我们将讨论Hadoop上Data Lake的重要性,以及为什么会出现对诸如Delta/Hudi之类的系统的需求,以及数据工程师在过去如何为Lakes孤立地构建易错的ACID系统。

2. 初始化

2.1 环境

源数据库:AWS RDS MySQL

CDC工具:AWS DMS

Hudi:AWS EMR 5.29.0

Delta:Databricks运行时6.1

对象/文件存储:AWS S3

上面的工具集主要用于演示;也可以使用以下工具替代

源数据库:任何传统/基于云的RDBMS

CDC工具:Attunity,Oracle Golden Gate,Debezium,Fivetran,自定义Binlog解析器

Hudi:开源/企业Hadoop上的Apache Hudi

Delta:开源/企业Hadoop上的Delta Lake

对象/文件存储:ADLS / HDFS

2.2 数据准备步骤

create database demo;
use demo;
create table hudi_delta_test
(
pk_id integer,
name varchar(255),
value integer,
updated_at timestamp default now() on update now(),
created_at timestamp default now(),
constraint pk primary key(pk_id)
);
insert into hudi_delta_test(pk_id,name,value) values(1,’apple’,10);
insert into hudi_delta_test(pk_id,name,value) values(2,’samsung’,20);
insert into hudi_delta_test(pk_id,name,value) values(3,’dell’,30);
insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);

现在使用DMS将数据加载到S3中的某个位置,并使用文件夹名称full_load来标识该位置。为了更贴合标题,我们将跳过DMS的设置和配置。加载到S3后如下图所示。

接着在MySQL表中执行一些插入/更新/删除操作

insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);
update hudi_delta_test set value = 201 where pk_id=2;
delete from hudi_delta_test where pk_id=3;

继续略过DMS阶段,将CDC数据按以下方式加载到S3,如下图所示

意: DMS将填充一个名为“ Op”的附加字段,表示“操作”,Op取值I/U/D,分别对应插入、更新和删除。 以下图所示显示了CDC数据的内容。

df = spark.read.parquet('s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test')
df.show()

完成了数据准备后正式开始比对。DMS将持续将CDC事件传送到S3(供Hudi和Delta Lake使用),此S3为数据源。两种工具的最终状态都旨在获得一致的统一视图,如上图MySQL所示。

3. 使用Apache HUDI

Hudi有两种方式处理UPSERTS [1]

  • 写时复制(CoW):数据以列格式(Parquet)存储,并且在更新时会创建文件的新版本。此存储类型最适合于读繁重的工作负载,因为数据集的最新版本始终在有效的列格式文件中可用。

  • 读时合并(MoR):数据以列(Parquet)和基于行(Avro)的格式存储;更新记录到基于行的“增量文件”中,并在以后进行压缩,以创建列文件的新版本。此存储类型最适合于写繁重的工作负载,因为新提交会以增量文件的形式快速写入,但是读取数据集需要合并列文件与增量文件。

3.1 启动Spark Shell

使用以下命令打开Spark Shell并进行相关导入.

spark-shell — conf “spark.serializer=org.apache.spark.serializer.KryoSerializer” — conf “spark.sql.hive.convertMetastoreParquet=false” — jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._ 
import org.apache.hudi.DataSourceWriteOptions 
import org.apache.hudi.config.HoodieWriteConfig 
import org.apache.hudi.hive.MultiPartKeysValueExtractor

3.2 使用CoW

val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test”
val hudiTableName = “hudi_cow”
val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_cow”
val hudiOptions = Map[String,String]
 (
 DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”,
 DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”, 
 HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
 DataSourceWriteOptions.OPERATION_OPT_KEY ->
 DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
 DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “COPY_ON_WRITE”,
 DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”, 
 DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”, 
 DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
 DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”, 
 DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”,
 DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
 )
val temp = spark.read.format(“parquet”).load(inputDataPath)
val fullDF = temp.withColumn(“Op”,lit(‘I’))
fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)

由于在Hudi选项中使用了Hive自动同步配置,因此会在Hive中创建一个名为“ hudi_cow”的表。该表使用具有Hoodie格式的Parquet SerDe创建,表结构如下图所示。

表数据如下图所示

val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)
updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)

进行更新操作,表“hudi_cow”将有最新的更新数据,如下图所示

如CoW定义中所述,当我们以hudi格式将updateDF写入同一S3位置时,更新的数据在写时被复制,并且快照和增量数据使用同一张表。

3.3 使用MoR

val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test”
val hudiTableName = “hudi_mor”
val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_mor”
val hudiOptions = Map[String,String]
 (
 DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”,
 DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”, 
 HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
 DataSourceWriteOptions.OPERATION_OPT_KEY ->
 DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
 DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “MERGE_ON_READ”,
 DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”, 
 DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”, 
 DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
 DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”, 
 DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”,
 DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
 )
val temp = spark.read.format(“parquet”).load(inputDataPath)
val fullDF = temp.withColumn(“Op”,lit(‘I’))
fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)

还是开启了Hive自动同步,将在Hive中创建两张名为“hudi_mor”和“ hudi_mor_rt”的表。hudi_mor是经过读优化的表,具有快照数据,而hudi_mor_rt将具有增量和实时合并数据。数据将会以频繁的压缩间隔被压缩,并提供给hudi_mor。hudi_mor_rt利用Avro格式存储增量数据。正如MoR定义所示,通过hudi_mor_rt读取数据时将即时合并。这对于高更新源表很有用,同时还提供一致且非最新的读优化表。

注意:“ hudi_mor”和“ hudi_mor_rt”都指向相同的S3存储桶,只是定义了不同的存储格式。

可以看到加载后两表内容相同,内容如下所示

val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)
updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)

表hudi_mor在很短时间内就具有相同的内容(因为演示中的数据很小,并且很快会被压缩),只要merge成功,表hudi_mor_rt就会有最新数据。

现在看看这些Hudi格式表的S3日志的变化。底层存储格式为parquet,同时通过日志方式管理ACID。通常生成以下类型的文件:

  • hoodie_partition_metadata:这是一个小文件,包含有关给定分区中partitionDepth和最后一次commitTime的信息

  • hoodie.properties:存储表名称、存储类型信息

  • commit和clean:文件统计信息和有关正在写入的新文件的信息,以及诸如numWrites,numDeletes,numUpdateWrites,numInserts和一些其他相关审计字段之类的信息,存储在这些文件中。这些文件在每次提交后生成

以上3个文件对于CoW和MoR类型的表都是通用的。另外对于MoR表,额外有为UPSERTED分区创建的avro格式的日志文件。如下所示的第一个log文件是CoW表中不存在的日志文件。

4. 使用Delta Lake

使用下面的代码片段,我们以parquet格式读取完整的数据,并以delta格式将其写入不同的位置

from pyspark.sql.functions import *
inputDataPath = "s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test"
deltaTablePath = "s3://development-dl/demo/hudi-delta-demo/delta_table"
fullDF = spark.read.format("parquet").load(inputDataPath)
fullDF = fullDF.withColumn("Op",lit('I'))
fullDF.write.format("delta").mode("overwrite").save(deltaTablePath)

在Databricks Notebook的SQL界面中使用以下命令可以创建一个Hive外表,“using delta”关键字会包含基础SERDE和FILE格式的定义。

%sql
create table delta_table 
using delta
location 's3://development-dl/demo/hudi-delta-demo/delta_table'

该表的DDL如下所示。

%sql
show create table delta_table

表会包含与完整加载文件相同的所有记录。

%sql
select * from delta_table

使用以下命令读取CDC数据并在Hive中注册为临时视图

updateDF = spark.read.parquet("s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test")
updateDF.createOrReplaceTempView("temp")

MERGE命令:下面是执行UPSERT的MERGE SQL,它作为SQL很方便地被执行,也可以在spark.sql()方法调用中执行

%sql
MERGE INTO delta_table target
USING 
(SELECT Op,latest_changes.pk_id,name,value,updated_at,created_at
  FROM temp latest_changes
 INNER JOIN (
   SELECT pk_id,  max(updated_at) AS MaxDate
   FROM temp
   GROUP BY pk_id
) cm ON latest_changes.pk_id = cm.pk_id AND latest_changes.updated_at = cm.MaxDate) as source
ON source.pk_id == target.pk_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED  THEN INSERT *

MERGE之后,Hive中delta_table的内容也更新了。

%sql
select * from delta_table

与Hudi一样,Delta Lake基本文件存储格式也是“parquet”。Delta提供带有日志和版本控制的ACID功能。接着看看S3在装载和CDC合并后的变化。

增量日志包含JSON格式的日志,文件中包含每次提交后的schema和最新文件的信息。

在CDC合并的情况下,由于可以插入/更新或删除多条记录。初始parquet文件的内容分为多个较小的parquet文件,这些较小的文件会被重写。如果对表进行了分区,则仅与更新的分区相对应的CDC数据将受到影响。初始parquet文件仍存在于该文件夹中,但已从新的日志文件中删除。如果我们在此表上运行VACUUM,则可以物理删除该文件。也可以使用OPTIMIZE命令[6]来串联这些较小的文件。

Delta日志附加了另一个JSON格式的日志文件,该文件存储schema和指向最新文件的文件指针。

5. 总结

上述两个示例中都按原样保留了删除的记录,并通过Op ='D'标识删除,这是故意而为以显示DMS的功能,下面的参考资料显示了如何将这种软删除转换为硬删除。

希望这是一个有用的比较,有助于做出合理的选择,选择合适的数据湖框架。

参考资料

  1. https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/

  2. https://databricks.com/blog/2019/07/15/migrating-transactional-data-to-a-delta-lake-using-aws-dms.html

  3. https://hudi.apache.org/

  4. https://docs.delta.io/

  5. https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html

  6. https://docs.databricks.com/delta/optimizations/index.html

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章