Hudi 压缩(Compaction)实现分析

1. 介绍

压缩( compaction )用于在 MergeOnRead 存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。用户可通过 hudi-cli 提供的命令行显示触发 compaction 或者在使用 HoodieDeltaStreamer 将上游(Kafka/DFS)数据写入 hudi 数据集时进行相应配置,然后由系统自动进行 compaction 操作。

2. 分析

对于 compaction 操作,Hudi主要将其分为生成 HoodieCompactionPlan 和执行 HoodieCompactionPlan 两阶段。

2.1 生成HoodieCompactionPlan

生成 HoodieCompactionPlan 的主要入口在 HoodieWriteClient#scheduleCompaction 。其核心代码如下

首先获取新的 commitTime (单调递增),然后调用 scheduleCompactionAtInstant 生成 HoodieCompactionPlan ,其核心代码如下

该方法首先会进行校验,包括如果存在 inflight 状态的 instant ,那么最早的 instant 的时间一定要大于当前压缩的时间(可知 compaction 时不允许还有处于 inflight 状态的非 compaction 类型的 instant ),以及对于 commitdeltacommitcompaction 类型的 instant 的时间一定要小于当前压缩的时间( compaction 时必须保证所有 completedinflightrequestedcompaction 的时间必须小于当前压缩时间)。

调度生成 CompactionPlanscheduleCompaction 核心代码如下

可以看到首先会根据从上次进行 compact 之后是否又满足再次 compact 的条件(即 deltacommit 次数是否已经达到要求),然后再调用 generateCompactionPlan 方法生成计划,其核心代码如下

可以看到,首先会获取所有的分区,对于每个分区,获取最新的所有不属于正在进行 compaction 操作中的 FileSlice ,对于 FileSlice ,然后再获取对应的数据文件、日志文件、并计算指标信息后生成 CompactionOperation ,并过滤出增量日志不为空的 CompactionOperation ,然后根据 CompactionOperation 构造 HoodieCompactionOperation ,最后会根据 HoodieCompactionOperation 生成 HoodieCompactionPlan (会对这次的 HoodieCompactionOperation 和pending的 HoodieCompactionPlan 中的operations进行排序,过滤选出 HoodieCompactionOperation ),需确保同个文件不会存在于多个 HoodieCompactionPlan 中。

在生成完 HoodieCompactionPlan 后,会将其序列化后保存在 .hoodie/.aux 元数据目录下,此时状态为 requested ,此时便完成了 HoodieCompactionPlan 的生成和写入。

2.2 执行HoodieCompactionPlan

在生成完 HoodieCompactionPlan 并保存在文件中后,执行 compaction 时,最终会调用 HoodieWriteClient#compact 方法,其核心代码如下

方法首先会进行检查,如果包含了 inflight 状态的 instant ,则需要回滚(以这次 compaction 为准),然后再调用 runCompaction 方法执行 compaction ,其核心代码如下

可以看到,首先会从之前序列化的文件中反序列出 HoodieCompactionPlan ,然后变更状态后开始调用 compact 方法执行compact操作,该方法最终会调用 HoodieRealtimeTableCompactor#compact ,其核心代码如下

可以看到,其核心逻辑在于重新生成了一个 HoodieCopyOnWriteTable ,然后将 HoodieCompactionOperation 转化为 CompactionOperation ,最后继续调用 compact 进行压缩操作,其核心代码如下

可以看到核心流程是构造一个log增量日志文件的记录迭代器(后续单独分析),然后判断该 operation 下的数据文件是否为空,若为空,则将所有记录写入新的parquet数据文件,若不为空,则将增量日志文件记录更新至parquet数据文件(与旧的数据文件记录合并后写入parquet文件)。在写入数据文件后会将写入的指标信息写入文件中, 并且将 compaction 的状态标记为 completed ,同时会将其变更为 timeline 上的 commit (文件格式为 commitTime.commit )。

3. 总结

compaction 时只存在于 MergeOnRead 存储类型时的操作,其首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件( FileSlice ),然后生成 HoodieCompactionPlan(每个FileSlice对应一个HoodieCompactionOperation) 并将其序列化至文件中,然后在执行 compaction 操作时会将其从文件中反序列化,然后从 HoodieCompactionPlan 中获取 HoodieCompactionOperation 并进行压缩,即会构建一个用于迭代log增量日志文件的迭代器,然后与旧的parquet数据文件进行合并或写入parquet数据文件,完成后会将统计信息写入文件,而 completedcompaction 操作在 timeline 上表现为 commit

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章