Flink 状态管理与 Checkpoint 机制

点击上方“ zhisheng ”,选择“ 设为星标

一、状态分类

相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用:

具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State。

1.1 算子状态

算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。官方文档上对 Operator State 的解释是:each operator state is bound to one parallel operator instance,所以更为确切的说一个算子状态是与一个并发的算子实例所绑定的,即假设算子的并行度是 2,那么其应有两个对应的算子状态:

1.2 键控状态

键控状态 (Keyed State) :是一种特殊的算子状态,即状态是根据 key 值进行区分的,Flink 会为每类键值维护一个状态实例。如下图所示,每个颜色代表不同 key 值,对应四个不同的状态实例。需要注意的是键控状态只能在 KeyedStream 上进行使用,我们可以通过 stream.keyBy(...) 来得到 KeyedStream 。

二、状态编程

2.1 键控状态

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):

ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。 ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。 ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。 AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。 FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。 MapState:维护 Map 类型的状态。

以上所有增删改查方法不必硬记,在使用时通过语法提示来调用即可。这里给出一个具体的使用示例:假设我们正在开发一个监控系统,当监控数据超过阈值一定次数后,需要发出报警信息。这里之所以要达到一定次数,是因为由于偶发原因,偶尔一次超过阈值并不能代表什么,故需要达到一定次数后才触发报警,这就需要使用到 Flink 的状态编程。相关代码如下:

publicclassThresholdWarningextends

RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {


// 通过ListState来存储非正常数据的状态

privatetransientListState<Long> abnormalData;

// 需要监控的阈值

privateLong threshold;

// 触发报警的次数

privateInteger numberOfTimes;


ThresholdWarning(Long threshold, Integer numberOfTimes) {

this.threshold = threshold;

this.numberOfTimes = numberOfTimes;

}


@Override

publicvoid open(Configuration parameters) {

// 通过状态名称(句柄)获取状态实例,如果不存在则会自动创建

abnormalData = getRuntimeContext().getListState(

newListStateDescriptor<>("abnormalData", Long.class));

}


@Override

publicvoid flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out)

throwsException{

Long inputValue = value.f1;

// 如果输入值超过阈值,则记录该次不正常的数据信息

if(inputValue >= threshold) {

abnormalData.add(inputValue);

}

ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());

// 如果不正常的数据出现达到一定次数,则输出报警信息

if(list.size() >= numberOfTimes) {

out.collect(Tuple2.of(value.f0 + " 超过指定阈值 ", list));

// 报警信息输出后,清空状态

abnormalData.clear();

}

}

}


调用自定义的状态监控,这里我们使用 a,b 来代表不同类型的监控数据,分别对其数据进行监控:

finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(

Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),

Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),

Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),

Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));

tuple2DataStreamSource

.keyBy(0)

.flatMap(newThresholdWarning(100L, 3)) // 超过100的阈值3次后就进行报警

.printToErr();

env.execute("Managed Keyed State");


输出如下结果如下:

2.2 状态有效期

以上任何类型的 keyed state 都支持配置有效期 (TTL) ,示例如下:

StateTtlConfig ttlConfig = StateTtlConfig

// 设置有效期为 10 秒

.newBuilder(Time.seconds(10))

// 设置有效期更新规则,这里设置为当创建和写入时,都重置其有效期到规定的10秒

.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

/*设置只要值过期就不可见,另外一个可选值是ReturnExpiredIfNotCleanedUp,

代表即使值过期了,但如果还没有被物理删除,就是可见的*/

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

.build();

ListStateDescriptor<Long> descriptor = newListStateDescriptor<>("abnormalData", Long.class);

descriptor.enableTimeToLive(ttlConfig);


2.3 算子状态

相比于键控状态,算子状态目前支持的存储类型只有以下三种:

ListState:存储列表类型的状态。 UnionListState:存储列表类型的状态,与 ListState 的区别在于:如果并行度发生变化,ListState 会将该算子的所有并发的状态实例进行汇总,然后均分给新的 Task;而 UnionListState 只是将所有并发的状态实例汇总起来,具体的划分行为则由用户进行定义。 BroadcastState:用于广播的算子状态。

这里我们继续沿用上面的例子,假设此时我们不需要区分监控数据的类型,只要有监控数据超过阈值并达到指定的次数后,就进行报警,代码如下:


publicclassThresholdWarningextendsRichFlatMapFunction<Tuple2<String, Long>,

Tuple2<String, List<Tuple2<String, Long>>>> implementsCheckpointedFunction{


// 非正常数据

privateList<Tuple2<String, Long>> bufferedData;

// checkPointedState

privatetransientListState<Tuple2<String, Long>> checkPointedState;

// 需要监控的阈值

privateLong threshold;

// 次数

privateInteger numberOfTimes;


ThresholdWarning(Long threshold, Integer numberOfTimes) {

this.threshold = threshold;

this.numberOfTimes = numberOfTimes;

this.bufferedData = newArrayList<>();

}


@Override

publicvoid initializeState(FunctionInitializationContext context) throwsException{

// 注意这里获取的是OperatorStateStore

checkPointedState = context.getOperatorStateStore().

getListState(newListStateDescriptor<>("abnormalData",

TypeInformation.of(newTypeHint<Tuple2<String, Long>>() {

})));

// 如果发生重启,则需要从快照中将状态进行恢复

if(context.isRestored()) {

for(Tuple2<String, Long> element : checkPointedState.get()) {

bufferedData.add(element);

}

}

}


@Override

publicvoid flatMap(Tuple2<String, Long> value,

Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) {

Long inputValue = value.f1;

// 超过阈值则进行记录

if(inputValue >= threshold) {

bufferedData.add(value);

}

// 超过指定次数则输出报警信息

if(bufferedData.size() >= numberOfTimes) {

// 顺便输出状态实例的hashcode

out.collect(Tuple2.of(checkPointedState.hashCode() + "阈值警报!", bufferedData));

bufferedData.clear();

}

}


@Override

publicvoid snapshotState(FunctionSnapshotContext context) throwsException{

// 在进行快照时,将数据存储到checkPointedState

checkPointedState.clear();

for(Tuple2<String, Long> element : bufferedData) {

checkPointedState.add(element);

}

}

}


调用自定义算子状态,这里需要将并行度设置为 1:

finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 开启检查点机制

env.enableCheckpointing(1000);

// 设置并行度为1

DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(

Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),

Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),

Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),

Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));

tuple2DataStreamSource

.flatMap(newThresholdWarning(100L, 3))

.printToErr();

env.execute("Managed Keyed State");

}


此时输出如下:

在上面的调用代码中,我们将程序的并行度设置为 1,可以看到三次输出中状态实例的 hashcode 全是一致的,证明它们都同一个状态实例。假设将并行度设置为 2,此时输出如下:

可以看到此时两次输出中状态实例的 hashcode 是不一致的,代表它们不是同一个状态实例,这也就是上文提到的,一个算子状态是与一个并发的算子实例所绑定的。同时这里只输出两次,是因为在并发处理的情况下,线程 1 可能拿到 5 个非正常值,线程 2 可能拿到 4 个非正常值,因为要大于 3 次才能输出,所以在这种情况下就会出现只输出两条记录的情况,所以需要将程序的并行度设置为 1。

三、检查点机制

3.1 CheckPoints

为了使 Flink 的状态具有良好的容错性,Flink 提供了检查点机制 (CheckPoints) 。通过检查点机制,Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后再将该 barrier 传递到下游算子,下游算子接收到该 barrier 后,也基于当前状态生成一份快照,依次传递直至到最后的 Sink 算子上。当出现异常后,Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态。

3.2 开启检查点

默认情况下,检查点机制是关闭的,需要在程序中进行开启:

// 开启检查点机制,并指定状态检查点之间的时间间隔

env.enableCheckpointing(1000);


// 其他可选配置如下:

// 设置语义

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 设置两个检查点之间的最小时间间隔

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// 设置执行Checkpoint操作时的超时时间

env.getCheckpointConfig().setCheckpointTimeout(60000);

// 设置最大并发执行的检查点的数量

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 将检查点持久化到外部存储

env.getCheckpointConfig().enableExternalizedCheckpoints(

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 如果有更近的保存点时,是否将作业回退到该检查点

env.getCheckpointConfig().setPreferCheckpointForRecovery(true);


3.3 保存点机制

保存点机制 (Savepoints) 是检查点机制的一种特殊的实现,它允许你通过手工的方式来触发 Checkpoint,并将结果持久化存储到指定路径中,主要用于避免 Flink 集群在重启或升级时导致状态丢失。示例如下:

触发指定id的作业的Savepoint,并将结果存储到指定目录下

bin/flink savepoint :jobId [:targetDirectory] 复制代码更多命令和配置可以参考官方文档:savepoints

四、状态后端

4.1 状态管理器分类

默认情况下,所有的状态都存储在 JVM 的堆内存中,在状态数据过多的情况下,这种方式很有可能导致内存溢出,因此 Flink 该提供了其它方式来存储状态数据,这些存储方式统一称为状态后端 (或状态管理器):

主要有以下三种:

MemoryStateBackend

默认的方式,即基于 JVM 的堆内存进行存储,主要适用于本地开发和调试。

FsStateBackend

基于文件系统进行存储,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。

RocksDBStateBackend

RocksDBStateBackend 是 Flink 内置的第三方状态管理器,采用嵌入式的 key-value 型数据库 RocksDB 来存储正在进行的数据。等到 checkpoint 时,再将其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时也需要配置持久化存储的文件系统。之所以这样做是因为 RocksDB 作为嵌入式数据库安全性比较低,但比起全文件系统的方式,其读取速率更快;比起全内存的方式,其存储空间更大,因此它是一种比较均衡的方案。

4.2 配置方式

Flink 支持使用两种方式来配置后端管理器:第一种方式:基于代码方式进行配置,只对当前作业生效:

// 配置 FsStateBackend

env.setStateBackend(newFsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

// 配置 RocksDBStateBackend

env.setStateBackend(newRocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));


配置 RocksDBStateBackend 时,需要额外导入下面的依赖:


<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-statebackend-rocksdb_2.11</artifactId>

<version>1.9.0</version>

</dependency>


第二种方式:基于 flink-conf.yaml 配置文件的方式进行配置,对所有部署在该集群上的作业都生效:

state.backend: filesystem

state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

参考资料

Working with State Checkpointing Savepoints State Backends Fabian Hueske , Vasiliki Kalavri . 《Stream Processing with Apache Flink》. O'Reilly Media . 2019-4-30

本文转自: https://juejin.im/post/5dd2661cf265da0bf175d5bb

如果觉得文章对你有帮助, 请转发朋友圈、点在看 ,让更多人获益,感谢您的支持!

END

关注我

公众号 ( zhisheng ) 里回复   面经、 ES、 Flink、   Spring、 Java、 Kafka、 监控  等关键字 可以查看更多关键字对应的文章。

Flink 源码解析

知识星球里面可以看到下面文章

Flink 系列文章

美团点评基于 Apache Flink 的实时数仓平台实践

基于 Flink 的大规模准实时数据分析平台实践

Spark/Flink广播实现作业配置动态更新

Flink 全链路端到端延迟的测量方法

基于Kafka+Flink+Redis的电商大屏实时计算案例

Flink SQL 如何实现数据流的 Join?

基于 Flink 构建关联分析引擎的挑战和实践

Flink Forward Asia 2019 会议视频和 PPT 全部放出

基于 Apache Flink 的大规模准实时数据分析平台

Flink Forward Asia 2019 会议所有 PPT 下载

Flink 在小红书推荐系统中的应用

Flink on YARN 常见问题与排查思路

Flink 单并行度内使用多线程来提高作业性能

一张图轻松掌握 Flink on YARN 基础架构与启动流程

详解 Flink Metrics 原理与监控实战

分享一些 Flink 专栏的真实反馈

好评如潮的 Flink 专栏,双十一买买买

阿里巴巴主推的 Flink 为什么这么火?

全网第一个 Flink 专栏—— Flink 实战与性能优化

Apache Flink 管理大型状态之增量 Checkpoint 详解

Flink on Yarn / K8s 原理剖析及实践

Flink Checkpoint 原理剖析与应用实践

吐血之作 | 流系统Spark/Flink/Kafka/DataFlow端到端一致性实现对比

深入理解 Flink 容错机制

Flink 实时写入数据到 ElasticSearch 性能调优

Flink中资源管理机制解读与展望

一文彻底搞懂 Flink 网络流控与反压机制

如何使用 Kubernetes 部署 Flink 应用

Flink 实战 | 贝壳找房基于Flink的实时平台建设

Flink Back Pressure(背压)是怎么实现的? 有什么绝妙之处?

滴滴实时计算发展之路及平台架构实践

Flink Connector 深度解析

Flink 在趣头条的应用与实践

如何使用 Flink 每天实时处理百亿条日志?

基于 Flink 实现的商品实时推荐系统(附源码)

一文让你彻底了解大数据实时计算引擎 Flink

修改代码150万行! Apache Flink 1.9.0做了这些重大修改!

Flink 从0到1学习 —— 如何使用 Side Output 来分流?

看过来,一大波热腾腾的实时计算岗位袭来

你公司到底需不需要引入实时计算引擎?

一文搞懂 Flink 的 Exactly Once 和 At Least Once

Flink 灵魂两百问,这谁顶得住?

美团点评基于 Flink 的实时数仓建设实践

如何基于Flink+TensorFlow打造实时智能异常检测平台? 只看这一篇就够了

Apache Flink 1.9 重大特性提前解读

360深度实践: Flink与Storm协议级对比

Flink 从0到1学习—— 分享四本 Flink 的书和二十多篇 Paper 论文

Flink 不可以连续 Split(分流)?

Flink 中这样管理配置,你知道?

Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)

Apache Flink 是如何管理好内存的?

原理解析 | Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

Flink状态管理和容错机制介绍

流计算框架 Flink 与 Storm 的性能对比

OPPO数据中台之基石: 基于Flink SQL构建实数据仓库

为什么说流处理即未来?

《Flink 源码解析》—— 源码编译运行

大数据“重磅炸弹”——实时计算框架 Flink

《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

《从0到1学习Flink》—— 你上传的 jar 包藏到哪里去了?

Blink 真香

《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍

《从0到1学习Flink》—— Flink JobManager 高可用性配置

《从0到1学习Flink》—— Flink 写入数据到 Kafka

《从0到1学习Flink》—— Flink 项目如何运行?

《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch

《从0到1学习Flink》—— Flink 中几种 Time 详解

《从0到1学习Flink》—— 介绍Flink中的Stream Windows

《从0到1学习Flink》—— Flink Data transformation(转换)

《从0到1学习Flink》—— 如何自定义 Data Sink ?

《从0到1学习Flink》—— 如何自定义 Data Source ?

《从0到1学习Flink》—— Data Sink 介绍

《从0到1学习Flink》—— Data Source 介绍

《从0到1学习Flink》—— Flink 配置文件详解

《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

《从0到1学习Flink》—— Apache Flink 介绍

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章