Table/SQL SCAN优化:应对source及上游的分区倾斜

好吧,终于要发一篇不是纯粹的广告文了。这些广告推文都是前段时间预先编排、排期好的,只是恰好排在在前几天发。

在正文之前,我想再提一下昨天的当当网图书大促推文 号外!当当网图书大促!送你专享优惠码!满减后再减!(400减230) (活动持续到6月3日,时间依然很充裕)。这次大促的优惠力度真的相当大,我是不希望大家错过一年难得一次的机会,优惠满减能帮你省下很多钱。建议大家不要一年散开来买太多次书,囤积起来一年在搞活动的时候买一两次就够了。你可以用省下的钱去做更多有意义的事情,比如这些钱绝对够买很多优秀的极客时间的专栏!

极客时间的专栏我买了不少,每天上下班路上都在看。很多工作多年的人,都有相似的感触,你的技能树看起来已经好久没变了,你的实际有效经验增长得也非常缓慢,周围的人或许也无法帮你摆脱技术上认知的束缚。这个时候如果你不觉得你应该多充电的话,那么随着你年龄的增长你会感觉到痛苦的。

言归正传,计算领域早已挂起SQL旋风,SQL是数据处理的标准语言,另一方面作为DSL提供了对一些复杂程序逻辑的良好抽象(抽象语法树[AST]可以表述大部分场景下data flow的DAG逻辑)。

但是从事物的两面性来看,抽象层级越高,你对底层细节的掌控就越困难。比如我们最近就遇到的一种情况:上游中间件中的分区数据出现了倾斜,那么反应到底层DAG中,数据处理的pipeline也会因此而倾斜(group by之前,我们暂且不讨论常规的hash分区造成的倾斜),而从纯粹的Flink SQL层面上我们却不太好控制它们的均衡方式。但如果我们是用DataStream API编写Job,那么事情就会简单很多:因为DataStream API给我们提供了shuffle的API,我们只需要在source后调用 rebalance 或其他shuffle类型的API,然后再接map算子进行数据解析,甚至可以单独提升map算子的并行度来提升数据解析的性能。

抽象程度高,绝大部分情况下是好事,就好像我们操作系统的UI一样。但是当你需要更多的个性化或者更高的控制权,你会一层一层向下找,从控制面板->注册表->BIOS->甚至BIOS高级设置。最后你还是不得不对细节有更多的了解。

其实在这里我们也一样,要解决这个问题,我们就希望控制SQL底层的shuffle机制,我们也需要沿着这个路径一层层向下找。我们从逻辑计划找到DataStream API对应的物理计划,然后在上层开放一个配置让用户来控制它。当然我们除了要从抽象层级路径向下找,我们还需要从SQL对应的语法树向下找,我们常规的Table Source其实对应着from子句(在AST中对应着table scan)。

所以我们很直接地找到table scan对应的物理计划所在的类 StreamTableSourceScan 以及 DataStreamScan (这两个类都继承自 StreamScan 接口)。为什么会有两个类呢,其实他们有各自的场景:

  • StreamTableSourceScan: 这是直接应对用户注册table source所对应的scan

  • DataStreamScan: 这是应对  DataStreamSource 也就是流转表(调用  tEnv#fromDataStream )的情况

无论以上哪个类,都存在 translateToPlan 方法用来转物理计划,我们主要关注这个方法。通过分析它的实现,我们看到真正对DataStream API的调用位于 StreamScan#convertToInternalRow 方法中。

OK,我们再分析一下我们的需求,在source后均衡一下上游分区中不均衡的数据,我们不能做的是什么: 改变SQL本身的分区策略 ,如果用户的SQL语句中包含 groupby 子句,那么我们不能对它的语义产生影响。我们看一下scan对应的DataStream API的调用,核心逻辑如下:

这里从 else/elseif 中我们看到,存在Row->CRow的过程,无论是 input->map 还是 input->process ,我们都可以在input后调用shuffle API,来重均衡数据,从而变成这样的一个处理链:input-> shuffle -> (row->crow),由于我们把shuffle加在转crow之前,所以它并不会破坏SQL本身的partition逻辑。

大致上,我们的改进逻辑如下:

//人为shuffle

//人为shuffle

这里我们可以让用户选择它想要的shuffle策略,但核心的原则是: 不能改变原始的数据量 ,比如像broadcast这种肯定是不能支持的。其实真正最有用的主要是 REBALANCE 。OK, 以上的示例代码展示了我们支持用户选择shuffle的策略,那么我们是如何让用户配置的呢。我们来看一下这个方法的参数列表:

这里我们看到一个 TableConfig 类型的config参数,它一直从Table/SQL 程序上下文穿透到物理计划层,为用户的程序提供相应的配置。我们只需要在它里面定义一个分区的枚举类型,然后开放相应的getter/setter即可。

另外,在你添加shuffle之后,会打破默认的operator chain,这时我们在生成JobGraph之后就可以针对shuffle之后的算子针对性地调整它的并行度,如果你们已经扩展了这个功能的话。没错,我们的Oceanus平台已经提供了这个功能。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章