Hudi MergeOnRead存储类型时Upsert分析

1. 引入

Hudi提供了两种存储类型,即 CopyOnWriteCOWMergeOnReadMORCOW 在数据插入时会直接写入parquet数据文件,对于更新时也会直接更新并写入新的parquet数据文件;而 MOR 在数据插入时会写入parquet数据文件,对于更新时则一般会写入log增量日志文件,而后进行压缩合并。之前在 Upsert在Hudi中的实现分析 已经分析过在 COW 类型下Hudi是如何处理 upsert ,这篇文章主要分析在 MOR 类型下Hudi是如何处理 upsert

2. 分析

COW 类型时,对于记录的 upsert ,其步骤如下:

  • 给记录打标签,即记录存在于哪些文件中,用于判断是进行更新还是插入操作。

  • 创建分区器用于重新分区。会创建多个  bucket ,其对应分区总数,每个  bucket 对应一个  FileId (已存在文件ID或新文件ID)和类型(  INSERT 、  UPDATE )。对于  INSERT 操作,在查找分区录下所有的小文件后,优先将记录插入到这些小文件中,若还剩余记录,则插入新文件。

  • 重新进行分区,不同分区获取对应的  bucket 后,则可知对该分区上的记录进行何种操作(由  bucket 类型决定),对于  UPDATE 操作,则合并老记录后写入新的parquet文件;对于  INSERT 操作,则直接写入新的parquet文件。

MOR 类型时,对于记录的 upsert ,总体步骤与上述类似,只是创建的分区器类型为 HoodieMergeOnReadTable.MergeOnReadUpsertPartitioner ,其为 HoodieCopyOnWriteTable.UpsertPartitioner 子类,两者在查找小文件时的表现不同。

2.1. Insert

对于记录的 insert 而言(分区对应的bucket类型为 INSERT ),最终会调用 HoodieMergeOnReadTable#handleInsert 方法来处理该操作,其核心代码如下

可以看到,其首先会判断所采用的索引是否支持索引日志文件,Hudi提供的三种类型的索引: HoodieBloomIndexHBaseIndexInMemoryHashIndex ,其中 HoodieBloomIndex 不支持索引日志文件,而其他两种均支持,支持索引表示可以对日志log文件进行插入操作,如只有log增量日志文件而无parquet数据文件(现在社区正打算对log增量日志文件支持索引,因此后续就可以直接写入log增量日志文件了)。

若支持索引日志文件,则会生成一个 MergeOnReadLazyInsertIterable 对象,其是 CopyOnWriteLazyInsertIterable 的子类,然后由其 consumeOneRecord 提供写入,其核心代码如下

可以看到其会借助 HoodieAppendHandle#write 完成真正的写入,具体对于log文件格式及写入Hudi做了很多优化,后续专门分析。

若不支持索引日志文件,则会调用父类的方法处理插入,即会生成一个 CopyOnWriteLazyInsertIterable 对象来处理写入,其会写入parquet数据文件,前面文章 Upsert在Hudi中的实现分析 已经分析过,不再赘述。

2.2. Update

对于记录的 update 而言(分区对应的bucket类型为 UPDATE ),最终会调用 HoodieMergeOnReadTable#handleUpdate 方法来处理该操作,其核心代码如下

可以看到,首先判断是否支持索引日志文件并且小文件集合中是否包含了正在操作的文件。

若不支持索引日志文件并且操作的文件为小文件,则直接调用父类的 HoodieCopyOnWrite#handleUpdate 方法将记录与老记录合并后写入新的parquet数据文件。

否则,则使用 HoodieAppendHandle 将记录写入log增量日志文件。

下面分析对于 HoodieMergeOnReadTable.MergeOnReadUpsertPartitioner 查找小文件的方法,这也与 HoodieCopyOnWriteTable.UpsertPartitioner 区分器的主要不同点。方法核心代码如下

该方法首先会过滤 completed 状态的 commitdeltacommit 类型的 timeline ,并找到最后一次 commit (可能是 commitdeltacommit ),为 MOR 类型时, timeline 中的 commit 表示已完成的 compact

若不支持索引日志文件,则查找最新的所有 FileSlice (由一个数据parquet数据文件和多个log增量日志文件组成)并且其数据文件大小小于配置的大小且无日志文件,然后排序后取最小的文件,该文件即为小文件(一个)。

若支持索引日志文件,则查找最新的所有 FileSlice 进行遍历,并利用log增量日志文件信息然后生成小文件(多个)。

该方法获取的小文件用于在 handleUpdate 时判断操作的文件是否为小文件,若为小文件并且不支持日志文件索引,则可直接更新该文件,否则生成新的log增量日志文件。

总结

对于 MOR 类型存储而言,数据写入及更新流程与 COW 大致相同;但对于 MOR 类型而言,在 insert 时,会根据是否支持索引日志文件来决定将记录写入log增量日志文件还是parquet数据文件(支持则写入log增量文件,否则写入parquet数据文件);在 update 时,其也会根据是否支持直接写入日志文件和更新的文件是否为小文件来决定是否合并新老记录写入parquet数据或者将新记录写入log增量日志文件中(不支持并且为小文件,则直接更新旧的parquet文件记录并写入新的parquet数据文件,否则写入log增量文件中)。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章