Spark读取变更Hudi数据集Schema实现分析

1. 介绍

Hudi支持上层Hive/Presto/Spark查询引擎,其中使用Spark读取Hudi数据集方法非常简单,在spark-shell或应用代码中,通过 spark.sqlContext.read.format("org.apache.hudi").load 便可加载Hudi数据集,本篇文章分析具体的实现。

2. 分析

2.1 源码梳理

Spark支持用户自定义的format来读取或写入文件,只需要实现对应的(RelationProvider、SchemaRelationProvider)等接口即可。而Hudi也自定义实现了 org.apache.hudi / hudi 来实现Spark对Hudi数据集的读写,Hudi中最重要的一个相关类为 DefaultSource ,其实现了 CreatableRelationProvider#createRelation 接口,并实现了读写逻辑。其中读逻辑实现的方法核心代码如下。

可以看到,对于读优化视图(ReadOptmized),会添加 HoodieROTablePathFilter ,其用于过滤Hudi数据集中的文件。而过滤主要逻辑在 HoodieROTablePathFilter#accept 方法中, HoodieROTablePathFilter 会处理Hudi数据集和非Hudi数据集,对于Hudi数据集而言,会选取分区路径下最新的提交的parquet文件。

接着通过 DataSource#resolveRelation 方法来解析parquet文件,关键逻辑如下

继续通过 DataSource#getOrInferFileFormatSchema 方法解析,其中一段关键代码如下

此时会根据不同的文件类型,如Orc/Text/Parquet类型来继续推导schema,其中tempFileIndex.allFiles获取到之前通过 HoodieROTableFilter 过滤出的所有最新提交的parquet文件, inferSchema 方法的关键代码如下

可以看到,当不需要合并schema时,是否需要需要合并schema可通过 mergeSchema 参数控制,当不需要时,默认获取的第一个文件,需要合并时,会 把所有文件的schema合并。其会影响spark查询结果,下面通过示例说明。

2.2 示例展示

2.2.1 schema配置

第一次插入时的schema如下

第二次更新时的schema如下(新增了sex列)

Hudi使用MOR模式。

2.2.2 插入/更新核心配置

写记录核心配置如下

更新记录核心配置如下

2.2.3 插入/更新实际数据设置

第一次插入实际数据为 {\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing\"}

当第二次更新实际数据为 {\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing\", \"sex\": \"male\"}

即第二次会更新一次写入的数据,那么使用如下代码显示数据时

那么会发现 结果包含了新增的sex列,未更新的值为null

当第二次更新实际数据为 {\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing1\", \"sex\": \"male\"}

即第二次会写入不同的分区,即不会更新第一次写入的数据,那么查询数据时,会发现查询的结果 不会出现新增的sex列

当使用如下代码显示数据时,设置合并schema参数,即会合并多个分区下的最新的parquet的schema。

会发现查询的结果 出现了新增的sex列

3. 总结

当使用Spark查询Hudi数据集时,当数据的schema新增时,会获取单个分区的parquet文件来推导出schema,若变更schema后未更新该分区数据,那么新增的列是不会显示,否则会显示该新增的列;若未更新该分区的记录时,那么新增的列也不会显示,可通过 mergeSchema 来控制合并不同分区下parquet文件的schema,从而可达到显示新增列的目的。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章