自定义StreamOperator

点击上方蓝

字关注~

在上一篇 StreamOperator源码简析 从源码角度分析了StreamOperator以及其实现类,此篇幅主要分析一下如何自定义一个StreamOperator。

StreamOperator接口提供了其生命周期的抽象方法,例如初始化方法setup、open、initializeState,checkpoint相关方法prepareSnapshotPreBarrier、snapshotState,但是我们没有必要去自己一一实现这些方法,可以继承其抽象类AbstractStreamOperator,覆盖一些我们需要重写的方法。 在上一篇分析中提到对于source端不需要接受上游数据,也就不需要实现OneInputStreamOperator或者TwoInputStreamOperator接口,如果我们需要接收上游数据就必须实现这两个接口中的一个,主要看一个输入还是两个输入来选择。

案例: 假设我们现在需要实现一个通用的定时、定量的输出的StreamOperator。

实现步骤:

  1. 继承AbstractStreamOperator抽象类,实现OneInputStreamOperator接口

  2. 重写open方法,调用flink 提供的定时接口,并且注册定时器

  3. 重写initializeState/snapshotState方法,由于批量写需要做缓存,那么需要保证数据的一致性,将缓存数据存在状态中

  4. 重写processElement方法,将数据存在缓存中,达到一定大小然后输出

  5. 由于需要做定时调用,那么需要有一个定时调用的回调方法,那么定义的类需要实现ProcessingTimeCallback接口,并且实现其onProcessingTime方法(关于flink定时可以参考定时系列文章)

代码:

publicabstractclassCommonSinkOperator<T extendsSerializable>extendsAbstractStreamOperator<Object>
implementsProcessingTimeCallback,OneInputStreamOperator<T,Object>{
 
privateList<T> list;
 
privateListState<T> listState;
 
privateint batchSize;
 
privatelong interval;
 
privateProcessingTimeService processingTimeService;
 
publicCommonSinkOperator(){
}
 
publicCommonSinkOperator(int batchSize,long interval){
this.chainingStrategy =ChainingStrategy.ALWAYS;
this.batchSize = batchSize;
this.interval = interval;
}
 
@Overridepublicvoid open()throwsException{
super.open();
if(interval >0&& batchSize >1){
//获取AbstractStreamOperator里面的ProcessingTimeService, 该对象用来做定时调用
//注册定时器将当前对象作为回调对象,需要实现ProcessingTimeCallback接口
processingTimeService = getProcessingTimeService();
long now = processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(now + interval,this);
}
}
//状态恢复
@Overridepublicvoid initializeState(StateInitializationContext context)throwsException{
super.initializeState(context);
this.list =newArrayList<T>();
listState = context.getOperatorStateStore().getSerializableListState("batch-interval-sink");
if(context.isRestored()){
listState.get().forEach(x ->{
list.add(x);
});
}
 
}
 
@Overridepublicvoid processElement(StreamRecord<T> element)throwsException{
list.add(element.getValue());
if(list.size()>= batchSize){
saveRecords(list);
}
 
}
//checkpoint
@Overridepublicvoid snapshotState(StateSnapshotContext context)throwsException{
super.snapshotState(context);
if(list.size()>0){
listState.clear();
listState.addAll(list);
}
}
//定时回调
@Overridepublicvoid onProcessingTime(long timestamp)throwsException{
if(list.size()>0){
saveRecords(list);
list.clear();
}
long now = processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(now + interval,this);//再次注册
}
 
publicabstractvoid saveRecords(List<T> datas);
}

如何调用? 直接使用dataStream.transform方式即可。

整体来说这个demo相对来说是比较简单的,但是这里面涉及的定时、状态管理也是值得研究,比喻说在这里定时我们直接选择ProcessingTimeService,而没有选择InternalTimerService来完成定时注册,主要是由于InternalTimerService会做定时调用状态保存,在窗口操作中需要任务失败重启仍然可以触发定时,但是在我们案例中不需要,直接下次启动重新注册即可,因此选择了ProcessingTimeService。

推荐阅读

1.  Flink中延时调用设计与实现

2. F link维表关联系列之Hbase维表关联:LRU策略

3.  你应该了解的Watermark

4. Flink exactly-once系列之事务性输出实现

5. F link时间系统系列之实例讲解:如何做定时输出

6.  Flink实战:全局TopN分析与实现

7.  Flink per-Job模式InfluxdbReporter上报JobName

8.  Flink SQL自定义聚合函数

关注回复 Flink 获取更多信息~

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章