Flink剖析|No.2 为什么Flink无法实时写入MySQL?

抛出疑无路?

Flink 1.10 】- 使用flink-jdbc连接器的方式与MySQL交互,读数据和写数据都能完成,但是在写数据时,发现Flink程序执行完毕之后,才能在MySQL中查询到插入的数据。即,虽然是流计算,但却不能实时的输出计算结果? From @罗鹏程

相关代码片段:

JDBCAppendTableSink.builder()

.setDrivername("com.mysql.jdbc.Driver")

.setDBUrl("jdbc:mysql://localhost/flink")

.setUsername("root")

.setPassword("123456")

.setParameterTypes(

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.STRING_TYPE_INFO)

.setQuery("insert into batch_size values(?,?)")

.build()

再现又一村!

【Flink-1.10】这个问题是知道一秒钟,不知磨洋工的Case,在初学时候非常容易遇上,那么真的是Flink不能实时写入MySQL吗?当然不是,上面代码基础之上简单的加上一行,就解决问题了:

...

.setBatchSize(1) //将写入MySQLbuffer大小为1。

..

向前一小步...

那么问题虽然解决了,根本原因是个啥呢?也许你看到这里会说,这问题很明显,就是Flink设计JDBC Sink的时候出于性能因素考虑,对写入buffer做了默认值设置,没错,这一点你说的很对,在 【Flink-1.10】 中JDBC OutputFormat的基类 AbstractJDBCOutputFormat里面和这相关的变量 DEFAULT_FLUSH_MAX_SIZE 默认值是5000,所以在你学习测试时候由于测试数据少(少于5000),数据一直在buffer中,直到数据源数据结束,作业也结束了,才将计算结果刷入MySQL,所以没有实时的(每条)写入MySQL。如下:

但这里还有个因素需要注意,那就是时间因素,上面DEFAULT_FLUSH_INTERVAL_MILLS默认值是0,这个相当于没有时间限制,一直等到buffer满了或者作业结束才能触发写出动作。也就是有些初学者,发现问题,即使故意 debug时候打上断点,不让作业结束,但是等到花儿都谢了,数据也没有写入到MySQL。

在【Flink 1.10】中 AbstractJDBCOutputFormat 有两个实现类, 

分别对应了如下两类Sink:

所以在 【Flink 1.10 中不论是 AppendTableSink和UpsertTableSink都会有同样的问题。 不过 UpsertTable Sink 时候用户可以设置时间, AppendTable Sink 连时间设置的 入口都木有

Flink 的锅?...

就这个问题而言,我个人 认为不是用户的问题,是Flink 1.10代码设计有进一步改进的空间。在Flink 1.11 中社区的确重构了,对JDBCOutputFormat 打了 @Deprecated。感兴趣可以查阅  F LINK-17537 了解变化过程。但是在这个改进中,并没有对   DEFA ULT_FLUSH_MAX_SIZE  默认值和  DEFAULT_FLUSH_INTERVAL_MILLS 默 认值做变化,社区也在积极的讨论改进方案,想参与社区贡献或者了解最终讨论结果的可以查阅 FLINK-16497。

举一反三

当然 在你学习过程中使用任何 Sink的时候,只要没有实时写入,都可以找找是否有 写出buffer和写出时间的限制设置。在这一点上, 罗鹏程  也提到了Elasticsearch也有类似问题,需要调用setBulkFlushMaxActions进行设置。

众人拾柴

期待你典型问 题的抛出...  我将知无不言...言无不尽... 我在又一村等你...

关注我的订阅号,我们并肩续航...

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章