秒懂 Flink 状态 State(上篇)

目录

  • Overview (概述)

  • Working with State (带状态的工作)

  • The Broadcast State Pattern (广播状态模式)

  • Checkpointing

  • Queryable State (可查询状态 )

  • State Backends (状态后端)

  • State Schema Evolution (状态模式演变)

  • Custom State Serialization (自定义状态序列化)

一、Overview (概述)

有状态的函数和 Operators 在各个 元素/事件 的处理中存储数据,使状态成为任何类型的更精细操作的关键构建部分。

例如:

  • 当应用程序搜索某些事件模式时,状态将存储到目前为止遇到的事件序列。

  • 在每 minute/hour/day 聚合事件时,状态保存待处理的聚合。

  • 当在数据点流上训练机器学习模型时,状态保持模型参数的当前版本。

  • 当需要管理历史数据时,状态允许有效访问过去发生的事件。

意识到Flink需要状态,为了使状态容错使用了 Checkpoint,并允许流式应用程序使用savepoints。

有关状态的知识还允许重新调整Flink应用程序,这意味着Flink负责跨并行实例重新分配状态。

Flink的可查询状态功能允许您在运行时从Flink外部访问状态。

在使用 state 时,阅读 Flink 的状态后端机制可能也很有用。Flink提供了不同的状态后端机制,用于指定状态的存储方式和位置。State可以位于Java的堆上或堆外。根据您的状态后端,Flink还可以管理应用程序的状态,这意味着Flink处理内存管理(如果需要可能会溢出到磁盘)以允许应用程序保持非常大的状态。可以在不更改应用程序逻辑的情况下配置状态后端。

1.1 下一步去哪儿?

  • Working with State:显示如何在 Flink 应用程序中使用状态并解释不同类型的状态。

  • The Broadcast State Pattern:说明如何将广播流与非广播流连接,并使用状态在它们之间交换信息。

  • Checkpointing:描述如何启用和配置容错检查点。

  • Queryable State:说明如何在运行时从Flink外部访问状态。

  • State Schema Evolution:显示状态类型的模式是如何演变的。

  • Custom Serialization for Managed State:讨论如何实现自定义序列化程序,尤其是模式演变。

二、Working with State (带状态的工作)

本节目录
Keyed State and Operator State(Keyed状态和Operator状态)
Keyed State (Keyed状态)
Operator State (Operator状态)
Raw and Managed State(Raw和状态管理)
Using Managed Keyed State(使用Managed Keyed状态)
State Time-To-Live (TTL)(状态存活时间)
State in the Scala DataStream API
Using Managed Operator State (使用Managed Operator状态)
Stateful Source Functions(有状态的Source函数)

2.1 Keyed State and Operator State(Keyed状态和Operator状态)

Flink中有两种基本的状态: Keyed State 和  Operator State

2.1.1 Keyed State (Keyed状态)

Keyed State 始终与键相关,只能在 KeyedStream 上的函数和 operators 中使用。

您可以将 Keyed State 视为已分区或分片的 Operator State,每个 key 只有一个状态分区。每个 keyed-state 在逻辑上绑定到 <parallel-operator-instance,key> 的唯一组合模式,并且由于每个键“属于” keyed operator 的一个并行实例,我们可以将其简单地视为  <operator, key>

Keyed State 进一步组织成所谓的 Key Groups 。Key Groups 是 Flink 可以重新分配 Keyed State 的原子单元; Key Groups 与定义的最大并行度完全一样多。在执行期间,keyed operator 的每个并行实例都使用一个或多个 Key Groups 的 key。

2.1.2 Operator State (Operator状态)

使用 Operator State(或non-keyed state),每个 operator state 都绑定到一个并行运算符实例。 Kafka Connector 是在 Flink 中使用 Operator State 的一个很好的激励示例。Kafka consumer 的每个并行实例都将 topic 分区和偏移的映射维护为其 Operator State。

Operator State 接口支持在并行性更改时在并行 operator 实例之间重新分配状态。进行此重新分配可以有不同的方案。

2.2 Raw and Managed State(Raw和状态管理)

Keyed State 和 Operator State 有两种形式:managed 和 raw。

Managed State 由 Flink 运行时控制的数据结构表示,例如内部哈希表或RocksDB。例如“ValueState”, “ListState”等。Flink的运行时对状态进行编码并将它们写入checkpoint。

Raw State 是 operators 保留在自己的数据结构中的状态。当checkpointe时,它们只会将一个字节序列写入checkpoint。Flink对状态的数据结构一无所知,只看到原始字节。

所有数据流功能都可以使用 managed state,但 raw state 接口只能在实现operator使用。建议使用managed state(而不是raw state),因为在raw state下,Flink能够在并行性更改时自动重新分配状态,并且还可以进行更好的内存管理。

注意 如果您的managed state需要自定义序列化逻辑,请参阅相应的指南以确保将来的兼容性。Flink的默认序列化器不需要特殊处理。

2.3 Using Managed Keyed State(使用Managed Keyed状态)

managed keyed state 接口提供对不同类型状态的访问,这些状态都限定为当前输入元素的key。这意味着这种类型的状态只能在KeyedStream上使用,KeyedStream可以通过 stream.keyBy(…) 创建。

现在,我们将首先查看可用的不同类型的状态,然后我们将看到它们如何在程序中使用。可用的状态原语是:

  • ValueState<T> :这保留了一个可以更新和检索的值(如上所述,作用于输入元素的key的范围,因此运算看到的每个key可能有一个值)。可以使用 update(T) 设置该值,并使用 T value() 检索该值。

  • ListState<T> :保留的元素列表。您可以追加元素并在所有当前存储的元素上检索 Iterable 。使用 add(T)addAll(List<T>) 添加元素,可以使用 Iterable<T> get() 检索Iterable。您还可以使用 update(List<T>) 重写现有的list。

  • ReducingState<T> :保留一个值,表示添加到状态的所有值的聚合。该接口类似于ListState,不过是使用 add(T) 添加的元素,reduced一个聚合是使用特定的 ReduceFunction

  • AggregatingState<IN, OUT> :保留一个值表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。接口与ListState相同,不过是使用 add(T) 添加的元素,reduced一个聚合是使用特定的 AggregateFunction

  • FoldingState<T, ACC> :保留一个值表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。该接口类似于ListState,不过是使用 add(T) 添加的元素,reduced一个聚合是使用特定的 FoldFunction

  • MapState<UK, UV> :保留一个映射列表。您可以将键值对放入状态,并在所有当前存储的映射上检索Iterable。使用 put(UK, UV)putAll(Map<UK, UV>) 添加映射。可以使用 get(UK) 检索与用户key关联的值。可以分别使用 entries()keys()values() 来检索映射、键和值的可迭代视图。

所有类型的状态还有一个方法 clear() ,它清除当前活动key的状态,即输入元素的key。

注意 FoldingStateFoldingStateDescriptor 已在Flink 1.4中弃用,将来会被完全删除。请改用 AggregatingStateAggregatingStateDescriptor

重要的是要牢记这些状态对象仅用于与状态接口。状态不一定存储在内部,但可能驻留在磁盘或其他位置。要记住的第二件事是,从状态获得的值取决于input元素的key。因此如果涉及的key不同,则在一次调用用户函数时获得的值可能与另一次调用中的值不同。

要获取状态句柄,您必须创建 StateDescriptor 。这保存了状态的名称(正如我们稍后将看到的,您可以创建多个状态,并且它们必须具有唯一的名称以便您可以引用它们),状态所持有的值的类型,并且可能是用户指定的函数,例如 ReduceFunction 。根据要检索的状态类型,可以创建 ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

使用 RuntimeContext 访问状态,因此只能在富函数中使用。请参阅此处了解相关信息,但我们很快也会看到一个示例。  RichFunction 中可用的 RuntimeContext 具有以下访问状态的方法:

  • ValueState<T> getState(ValueStateDescriptor<T>)

  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)

  • ListState<T> getListState(ListStateDescriptor<T>)

  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)

  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)

  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

这是一个示例 FlatMapFunction ,显示所有的部分如何组合在一起:

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {


private var sum: ValueState[(Long, Long)] = _


override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {


// access the state value

val tmpCurrentSum = sum.value // If it hasn't been used before, it will be null

val currentSum = if (tmpCurrentSum != null) {

tmpCurrentSum } else {

(0L, 0L)

}


// update the count

val newSum = (currentSum._1 + 1, currentSum._2 + input._2)


// update the state

sum.update(newSum)


// if the count reaches 2, emit the average and clear the state

if (newSum._1 >= 2) {

out.collect((input._1, newSum._2 / newSum._1))

sum.clear()

}

}


override def open(parameters: Configuration): Unit = {

sum = getRuntimeContext.getState(

new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])

)

}}object ExampleCountWindowAverage extends App {

val env = StreamExecutionEnvironment.getExecutionEnvironment


env.fromCollection(List(

(1L, 3L),

(1L, 5L),

(1L, 7L),

(1L, 4L),

(1L, 2L)

)).keyBy(_._1)

.flatMap(new CountWindowAverage())

.print()

// the printed output will be (1,4) and (1,5)

env.execute("ExampleManagedState")}

这个例子实现了一个 poor man’s 的计数窗口。我们通过第一个字段键入元组(在示例中都具有相同的key 1)。该函数将计数和运行总和存储在ValueState中。一旦计数达到2,它将发出平均值并清除状态,以便我们从0开始。注意,如果我们在第一个字段中有不同值的元组,这将为每个不同的输入key保持不同的状态值。

2.3.1 State Time-To-Live (TTL)(状态存活时间)

可以将存活时间(TTL)分配给任何类型的keyed state。如果配置了TTL并且状态值已过期,则将尽力清除存储的值,这将在下面更详细地讨论。

所有状态集合类型都支持 per-entry TTL。这意味着列表元素和map条目将独立到期。

为了使用状态TTL,必须首先构建 StateTtlConfig 配置对象。然后,可以通过传递配置在任何状态描述符中启用TTL功能:

import org.apache.flink.api.common.state.StateTtlConfigimport org.apache.flink.api.common.state.ValueStateDescriptorimport org.apache.flink.api.common.time.Time


val ttlConfig = StateTtlConfig

.newBuilder(Time.seconds(1))

.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

.build

val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])stateDescriptor.enableTimeToLive(ttlConfig)

配置有几个选项需要考虑:

newBuilder 方法的第一个参数是必需的,它是值为存活时间。

当状态存活时间刷新时更新类型配置(默认为 OnCreateAndWrite ):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅限创建和写入访问

  • StateTtlConfig.UpdateType.OnReadAndWrite - 也是读访问权限

状态可见性配置是否在读取访问时返回过期值(如果尚未清除)(默认为 NeverReturnExpired ):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 永远不会返回过期值

  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用则返回

NeverReturnExpired 的情况下,过期状态表现得好像它不再存在,即使它仍然必须被删除。该选项对于在TTL之后必须严格用于读取访问的数据的用例是有用的,例如,应用程序使用隐私敏感数据。

另一个选项 ReturnExpiredIfNotCleanedUp 允许在清理之前返回过期状态。

笔记

  • 状态后端存储上次修改的时间戳以及用户值,这意味着启用此功能会增加状态存储的消耗。堆状态后端存储一个额外的Java对象,其中包含对用户状态对象的引用和内存中的原始long值。RocksDB状态后端为每个存储值,list条目或map条目添加8个字节。

  • 目前仅支持参考处理时间的TTL。

  • 尝试恢复先前未使用TTL配置的状态,使用启用TTL的描述符(反之亦然)将导致兼容性失败和 StateMigrationException

  • TTL配置不是checkpoint或savepoint的一部分,而是Flink如何在当前运行的作业中处理它的方式。

  • 仅当用户值序列化程序可以处理空值时,具有TTL的映射状态当前才支持空用户值。如果序列化程序不支持空值,则可以使用NullableSerializer包装它,代价是序列化形式的额外字节。

Cleanup of Expired State(清除过期状态)

默认情况下,只有在明确读出过期值时才会删除过期值,例如通过调用 ValueState.value()

注意 这意味着默认情况下,如果未读取过期状态,则不会删除它,可能会导致状态不断增长。这可能在将来的版本中发生变化

清除完整快照

此外,您可以在获取完整状态快照时激活清理,这将减小其大小。在当前实现下不清除本地状态,但在从上一个快照恢复的情况下,它不会包括已删除的过期状态。它可以在 StateTtlConfig 中配置:

import org.apache.flink.api.common.state.StateTtlConfigimport org.apache.flink.api.common.time.Time


val ttlConfig = StateTtlConfig

.newBuilder(Time.seconds(1))

.cleanupFullSnapshot

.build

此选项不适用于RocksDB状态后端中的增量Checkpoint。

笔记:

  • 对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如, 从savepoint重启后。

Cleanup in background (在后台清除)

除了在完整快照中清理外,您还可以在后台激活清理。如果使用的后端支持以下选项,则会激活 StateTtlConfig 中的默认后台清理:

import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig

.newBuilder(Time.seconds(1))

.cleanupInBackground

.build

要在后台对某些特殊清理进行更精细的控制,可以按照下面的说明单独配置它。目前,堆状态后端依赖于增量清理,RocksDB后端使用压缩过滤器进行后台清理。

增量清理

另一种选择是逐步触发一些状态条目的清理。触发器可以是来自每个状态访问or/and 每个记录处理的回调。如果此清理策略对某些状态处于活动状态,则存储后端会在其所有条目上为此状态保留一个惰性全局迭代器。每次触发增量清理时,迭代器都会被提前。检查遍历的状态条目,清除过期的状态条目。

可以在 StateTtlConfig 中激活此功能:

import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig

.newBuilder(Time.seconds(1))

.cleanupIncrementally

.build

该策略有两个参数。第一个是每次清理触发的已检查状态条目数。如果启用,则始终按每个状态访问触发。第二个参数定义是否每个记录处理另外触发清理。如果启用默认后台清理,则将为具有5个已检查条目的堆后端激活此策略,并且不对每个记录处理进行清理。

笔记

  • 如果状态没有访问或没有处理记录,则过期状态将持续存在。

  • 增量清理所花费的时间会增加记录处理延迟。

  • 目前,仅针对堆状态后端实施增量清理。为RocksDB设置它将不起作用。

  • 如果堆状态后端与同步快照一起使用,则全局迭代器会在迭代时保留所有key的副本,因为它的特定实现不支持并发修改。启用此功能会增加内存消耗。异步快照没有此问题。

  • 对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如,从savepoint重启后。

在RocksDB压缩期间进行清理

如果使用RocksDB状态后端,则另一种清理策略是激活Flink特定的压缩过滤器。RocksDB定期运行异步压缩以合并状态更新并减少存储。Flink压缩过滤器使用TTL检查状态条目的到期时间戳,并排除过期值。

默认情况下禁用此功能。必须首先通过设置Flink配置选项 state.backend.rocksdb.ttl.compaction.filter.enabled 或通过调用 RocksDBStateBackend::enableTtlCompactionFilter 为RocksDB后端激活它,如果为作业创建了自定义RocksDB状态后端。然后可以将任何具有TTL的状态配置为使用过滤器:

import org.apache.flink.api.common.state.StateTtlConfig


val ttlConfig = StateTtlConfig

.newBuilder(Time.seconds(1))

.cleanupInRocksdbCompactFilter(1000)

.build

每次处理一定数量的状态条目后,RocksDB压缩过滤器将从Flink查询用于检查过期的当前时间戳。您可以更改它并将自定义值传递给 StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法。更频繁地更新时间戳可以提高清理速度,但是它会降低压缩性能,因为它使用来自本机代码的JNI调用。如果启用默认后台清理,则将为RocksDB后端激活此策略,并且每次处理1000个条目时将查询当前时间戳。

您可以通过激活 FlinkCompactionFilter 的调试级别来激活RocksDB过滤器的本机代码中的调试日志:  log4j.logger.org.rocksdb.FlinkCompactionFilter= DEBUG

笔记

  • 在压缩期间调用TTL过滤器会减慢它的速度。TTL过滤器必须解析上次访问的时间戳,并检查每个正在压缩的key的每个存储状态条目的到期时间。在收集状态类型(list或map)的情况下,还对每个存储的元素调用检查。

  • 如果此功能与包含非固定字节长度的元素的列表状态一起使用,则本机TTL过滤器必须每个状态条目另外调用JNI上元素的Flink java类型序列化程序,其中至少第一个元素已到期确定下一个未到期元素的偏移量。

  • 对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如从savepoint重启后。

2.3.2 State in the Scala DataStream API

除了上面描述的接口之外,Scala API还具有有状态 map()flatMap() 函数的快捷方式,在KeyedStream上具有单个ValueState。用户函数在Option中获取ValueState的当前值,并且必须返回将用于更新状态的更新值。

val stream: DataStream[(String, Int)] = ...val counts: DataStream[(String, Int)] = stream .keyBy(_._1)

.mapWithState((in: (String, Int), count: Option[Int]) =>

count match {

case Some(c) => ( (in._1, c), Some(c + in._2) )

case None => ( (in._1, 0), Some(in._2) )

})

2.4 Using Managed Operator State (使用Managed Operator状态)

要使用managed operator状态,有状态函数可以实现更通用的 CheckpointedFunction 接口,或者 ListCheckpointed<T extends Serializable> 接口。

CheckpointedFunction

CheckpointedFunction 接口提供对具有不同重新分发方案的非non-keyed状态的访问。它需要实现两种方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

每当必须执行checkpoint时,都会调用 snapshotState() 。每次初始化用户定义的函数时,都会调用对应的 initializeState() ,即首次初始化函数时,或者当函数实际从早期checkpoint恢复时。鉴于此 initializeState() 不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。

目前,支持列表样式的managed operator状态。该状态应该是一个可序列化对象的列表,彼此独立,因此有资格在重新缩放时重新分配。换句话说,这些对象是可以重新分配non-keyed状态的最精细的粒度。根据状态访问方法,定义了以下重新分发方案:

  • Even-split再分配 :每个运算符返回一个状态元素列表。整个状态在逻辑上是所有列表的串联。在恢复/重新分发时,列表被平均分成与并行运算符一样多的子列表。每个运算符都获得一个子列表,该子列表可以为空,或包含一个或多个元素。例如,如果使用并行度为1,运算符的checkpoint状态包含元素element1和element2,当将并行度增加到2时,element1可能最终在operator实例0中,而element2将转到operator实例1。

  • Union重新分配 :每个运算符返回一个状态元素列表。整个状态在逻辑上是所有列表的串联。在恢复/重新分配时,每个运算符都会获得完整的状态元素列表。

下面是一个有状态 SinkFunction 的示例,它使用 CheckpointedFunction 缓冲元素,然后再将它们发送到外部世界。它演示了基本的 Even-split再分配 列表状态:

class BufferingSink(threshold: Int = 0)

extends SinkFunction[(String, Int)]

with CheckpointedFunction {


@transient

private var checkpointedState: ListState[(String, Int)] = _ private val bufferedElements = ListBuffer[(String, Int)]()


override def invoke(value: (String, Int), context: Context): Unit = {

bufferedElements += value if (bufferedElements.size == threshold) {

for (element <- bufferedElements) {

// send it to the sink

}

bufferedElements.clear()

}

}


override def snapshotState(context: FunctionSnapshotContext): Unit = {

checkpointedState.clear()

for (element <- bufferedElements) {

checkpointedState.add(element)

}

}


override def initializeState(context: FunctionInitializationContext): Unit = {

val descriptor = new ListStateDescriptor[(String, Int)](

"buffered-elements",

TypeInformation.of(new TypeHint[(String, Int)]() {})

)


checkpointedState = context.getOperatorStateStore.getListState(descriptor)


if(context.isRestored) {

for(element <- checkpointedState.get()) {

bufferedElements += element }

}

}}

initializeState 方法将 FunctionInitializationContext 作为参数。这用于初始化non-keyed状态“containers”。这些是ListState类型的容器,其中non-keyed状态对象将在checkpoint存储。

注意状态是如何初始化的,类似于keyed状态, StateDescriptor 包含状态名称和有关状态值的类型的信息:

val descriptor = new ListStateDescriptor[(String, Long)](

"buffered-elements",

TypeInformation.of(new TypeHint[(String, Long)]() {}

))

checkpointedState = context.getOperatorStateStore.getListState(descriptor)

状态访问方法的命名约定包含其重新分发模式,后跟其状态结构。例如,要在还原时使用 联合重新分发 方案的列表状态,请使用 getUnionListState(descriptor) 访问该状态。如果方法名称不包含重新分发模式,例如  getListState(descriptor) ,它只是意味着将使用基本的even-split再分配方案。

在初始化容器之后,我们使用上下文的 isRestored() 方法来检查我们是否在失败后恢复。如果这是真的,即我们正在恢复,则应用恢复逻辑。

如修改后的 BufferingSink 的代码所示,在状态初始化期间恢复的ListState保存在类变量中以供将来在 snapshotState() 中使用。在那里,ListState被清除前一个checkpoint包含的所有对象,然后填充我们想要checkpoint的新对象。

作为旁注,keyed状态也可以在 initializeState() 方法中初始化。这可以使用提供的 FunctionInitializationContext 来完成。

ListCheckpointed

ListCheckpointed 接口是 CheckpointedFunction 的一个更有限的变体,它仅支持在恢复时具有even-split再分配方案的列表样式状态。它还需要实现两种方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

snapshotState() 上operator应该返回checkpoint的对象列表,restoreState必须在恢复时处理这样的列表。如果状态不可重新分区,则始终可以在 snapshotState() 中返回 Collections.singletonList(MY_STATE)

2.4.1 Stateful Source Functions (有状态的Source函数)

与其他operators相比,有状态的来源需要更多的关注。为了使状态和输出集合的更新成为原子性的(在故障/恢复时精确一次语义所需),用户需要从源的上下文获取锁定。

class CounterSource

extends RichParallelSourceFunction[Long]

with ListCheckpointed[Long] {


@volatile

private var isRunning = true


private var offset = 0L


override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {

val lock = ctx.getCheckpointLock while (isRunning) {

// output and state update are atomic

lock.synchronized({

ctx.collect(offset)


offset += 1

})

}

}


override def cancel(): Unit = isRunning = false


override def restoreState(state: util.List[Long]): Unit =

for (s <- state) {

offset = s }


override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =

Collections.singletonList(offset)}

当Flink完全确认checkpoint与外界通信时,某些operators可能需要这些信息。在这种情况下,请参阅 org.apache.flink.runtime.state.CheckpointListener 接口。

三、The Broadcast State Pattern (广播状态模式)

本节目录
Provided APIs (提供的API)
BroadcastProcessFunction and KeyedBroadcastProcessFunction
Important Considerations (重要考虑因素)

working with State描述operator状态,该运算符状态在恢复时均匀分布在operator的并行任务中,或者联合使用,整个状态用于初始化已恢复的并行任务。

第三种支持的operator状态是广播状态。引入广播状态是为了支持这样的用例,其中来自一个流的一些数据需要被广播到所有下游任务,其中它被本地存储并用于处理另一个流上的所有传入元素。作为一个广播状态可以自然出现的例子,可以想象包含一组规则的低吞吐量流,我们希望针对来自另一个流的所有元素进行评估。考虑到上述类型的用例,广播状态与其他operator状态的不同之处在于:

  • 它有一个Map格式,

  • 它仅适用于具有广播流和非广播流的输入的特定operators

  • 这样的operators可以具有不同名称的多个广播状态。

3.1 Provided APIs (提供的API)

为了显示提供的API,我们将在展示其完整功能之前先举一个示例。作为我们的运行示例,我们将使用我们拥有不同颜色和形状的对象流的情况,并且我们想要找到遵循特定模式的相同颜色的对象对儿,例如,一个矩形后跟一个三角形。我们假设这组有趣的模式随着时间的推移而发展。

在此示例中,第一个流将包含Item类型的元素,其中包含Color和Shape属性。另一个流将包含规则。

从Items流开始,我们只需要通过Color作为key,因为我们需要相同颜色的对。这将确保相同颜色的元素最终在同一台物理机器上。

// key the shapes by color

KeyedStream<Item, Color> colorPartitionedStream = shapeStream

.keyBy(new KeySelector<Shape, Color>(){...});

继续执行规则部分,包含它们的流应该被广播到所有下游任务,并且这些任务应该在本地存储它们,以便它们可以针对所有传入的Items对它们进行评估。下面的片段将 (i)广播规则流和 (ii)使用提供的MapStateDescriptor,它将创建存储规则的广播状态。

// a map descriptor to store the name of the rule (string) and the rule itself.

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(

"RulesBroadcastState",

BasicTypeInfo.STRING_TYPE_INFO,

TypeInformation.of(new TypeHint<Rule>() {}));

// broadcast the rules and create the broadcast state

BroadcastStream<Rule> ruleBroadcastStream = ruleStream

.broadcast(ruleStateDescriptor);

最后,为了根据Item流中的传入元素评估规则,我们需要:

  • 连接两个流,然后

  • 指定我们的匹配检测逻辑。

将流(keyed或non-keyed)与BroadcastStream连接可以通过在非广播流上调用connect(),并将BroadcastStream作为参数来完成。这将返回一个BroadcastConnectedStream,我们可以使用特殊类型的CoProcessFunction调用process()。该函数将包含我们的匹配逻辑。函数的确切类型取决于非广播流的类型:

  • 如果是keyed,则该函数是KeyedBroadcastProcessFunction。

  • 如果它是non-keyed,则该函数是BroadcastProcessFunction。

鉴于我们的非广播流是keyed的,以下代码段包含以上调用:

注意 :应该在非广播流上调用connect,并将BroadcastStream作为参数。

DataStream<String> output = colorPartitionedStream .connect(ruleBroadcastStream)

.process(

// 我们的KeyedBroadcastProcessFunction中的类型参数表示:

// 1. keyed流的键

// 2. 非广播方的元素类型

// 3. 广播方的元素类型

// 4. 结果的类型, 这里是一个 string

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

// my matching logic

}

);

3.1.1 BroadcastProcessFunction and KeyedBroadcastProcessFunction

与CoProcessFunction的情况一样,这些函数有两种实现方法; processBroadcastElement() 负责处理广播流中的传入元素,和 processElement() 用于非广播流。这些方法的完整签名如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {


public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;


public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;}

public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {


public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;


public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;


public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;}

首先要注意的是,这两个函数都需要实现 processBroadcastElement() 方法来处理广播端中的元素,而 processElement() 则需要非广播端的元素。

这两种方法在提供的上下文中有所不同。非广播端有一个 ReadOnlyContext ,而广播端有一个Context。

这两个上下文(在一下列举的ctx):

getBroadcastState() 中的stateDescriptor应该与上面的 .broadcast(ruleStateDescriptor) 中的stateDescriptor相同。

不同之处在于每个人对广播状态的访问类型。广播方对其具有 读写访问权限 ,而非广播方具有 只读访问权 (因这个名称)。原因是在Flink中没有跨任务通信。因此,为了保证广播状态中的内容在我们的operator的所有并行实例中是相同的,我们只对广播端提供读写访问,广播端在所有任务中看到相同的元素,并且我们需要对每个任务进行计算。这一侧的传入元素在所有任务中都是相同的。忽略此规则会破坏状态的一致性保证,从而通常导致不一致且难以调试的结果。

注意processBroadcast() 中实现的逻辑必须在所有并行实例中具有相同的确定性行为!

最后,由于 KeyedBroadcastProcessFunction 在keyed流上运行,它向外提供了一些BroadcastProcessFunction不可用的功能。那是:

注意 :只能在 KeyedBroadcastProcessFunctionprocessElement() 中注册定时器。在 processBroadcastElement() 方法中是不可能的,因为没有与广播元素相关联的key。

回到我们的原先的示例,我们的KeyedBroadcastProcessFunction可能如下所示:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {


// 保存部分的匹配项, 即等待其第二个元素pair的第一个元素

// 我们保存一个list, 因为我们可能有许多对一个元素在等待

private final MapStateDescriptor<String, List<Item>> mapStateDesc =

new MapStateDescriptor<>(

"items",

BasicTypeInfo.STRING_TYPE_INFO,

new ListTypeInfo<>(Item.class));


// identical to our ruleStateDescriptor above

private final MapStateDescriptor<String, Rule> ruleStateDescriptor =

new MapStateDescriptor<>(

"RulesBroadcastState",

BasicTypeInfo.STRING_TYPE_INFO,

TypeInformation.of(new TypeHint<Rule>() {}));


@Override

public void processBroadcastElement(Rule value,

Context ctx,

Collector<String> out) throws Exception {

ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);

}


@Override

public void processElement(Item value,

ReadOnlyContext ctx,

Collector<String> out) throws Exception {


final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);

final Shape shape = value.getShape();

for (Map.Entry<String, Rule> entry :

ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {

final String ruleName = entry.getKey();

final Rule rule = entry.getValue();

List<Item> stored = state.get(ruleName);

if (stored == null) {

stored = new ArrayList<>();

}

if (shape == rule.second && !stored.isEmpty()) {

for (Item i : stored) {

out.collect("MATCH: " + i + " - " + value);

}

stored.clear();

}

// there is no else{} to cover if rule.first == rule.second

if (shape.equals(rule.first)) {

stored.add(value);

}

if (stored.isEmpty()) {

state.remove(ruleName);

} else {

state.put(ruleName, stored);

}

}

}}

3.2 Important Considerations (重要考虑因素)

在描述了提供的API之后,本节重点介绍使用广播状态时要记住的重要事项。这些是:

  • 没有cross-task通信 : 如上所述,这就是为什么只有(Keyed)-BroadcastProcessFunction的广播端可以修改广播状态的内容。此外,用户必须确保所有任务以相同的方式为每个传入元素修改广播状态的内容。否则,不同的任务可能具有不同的内容,从而导致不一致的结果。

  • 广播状态中的事件顺序可能因任务而异 : 尽管广播流的元素保证所有元素将(最终)转到所有下游任务,但元素可能以不同的顺序到达每个任务。因此,每个传入元素的状态更新绝不取决于传入事件的顺序。

  • 所有任务都checkpoint其广播状态 : 虽然checkpoint发生时所有任务在广播状态中都具有相同的元素(checkpoint的barriers不会覆盖元素),但所有任务都会checkpoint其广播状态,而不仅仅是其中一个。这是一个设计决策,以避免在恢复期间从同一文件中读取所有任务(从而避免热点),尽管它的代价是将检查点状态的大小增加p(等于并行度)。Flink保证在restoring/rescaling时 不会出现重复 数据,也 不会丢失数据 。在具有相同或更小并行度的恢复的情况下,每个任务读取其restoring/rescaling状态。调整比例时,每个任务都读取其自己的状态,其余任务(p_new-p_old)以循环方式读取先前任务的checkpoints。

  • 没有RocksDB状态后端 : 广播状态在运行时保留在内存中,并且应该相应地进行内存配置。这适用于所有operator状态。

四、Checkpointing

本节目录
Prerequisites (要求)
Enabling and Configuring Checkpointing (启用和配置 Checkpoint)
Related Config Options (相关配置项)
Selecting a State Backend (选择状态后端)
State Checkpoints in Iterative Jobs (迭代job中的状态Checkpoint)
Restart Strategies (重启策略)

Flink中的每个函数和operator都可以是有状态的(有关详细信息请参阅working with State)。有状态函数在各个元素/事件的处理中存储数据,使状态成为任何类型的更复杂operator的关键构建部分。

为了使状态容错,Flink需要Checkpoint状态。checkpoint允许Flink恢复流中的状态和位置,从而为应用程序提供与无故障执行相同的语义。

在流容错的文档中详细描述了Flink的流容错机制背后的技术。

4.1 Prerequisites (要求)

Flink的checkpoint机制与流和状态的持久存储交互。一般来说,它需要:

  • 持久(或持续)数据源,可以在一定时间内重放记录。这种源的示例是持久消息队列(例如,Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub)或文件系统(例如,HDFS,S3,GFS,NFS,Ceph,…)。

  • 状态的持久存储,通常是分布式文件系统(例如,HDFS,S3,GFS,NFS,Ceph,…)

4.2 Enabling and Configuring Checkpointing (启用和配置 Checkpoint)

默认情况下checkpoint是被禁用的。要启用检查点,请在StreamExecutionEnvironment上调用 enableCheckpointing(n) ,其中n是检查点间隔(以毫秒为单位)。

检查点的其他参数包括:

  • exactly-once 与 at-least-once : 您可以选择将模式传递给 enableCheckpointing(n) 方法,以便在两个保证级别之间进行选择。对于大多数应用来说,恰好一次是优选的。至少一次可能与某些超低延迟(始终为几毫秒)的应用程序相关。

  • checkpoint超时时间 : 如果在那之前没有完成,则中止正在进行的checkpoint的时间。

  • checkpoint之间的最短时间 : 为了确保流应用程序在检查点之间取得一定进展,可以定义checkpoint之间需要经过多长时间。如果将此值设置为例如5000,则无论checkpoint持续时间和checkpoint间隔如何,下一个checkpoint将在上一个checkpoint完成后的5秒内启动。请注意,这意味着checkpoint间隔永远不会小于此参数。

通过定义“checkpoint间的时间”而不是checkpoint间隔来配置应用程序通常更容易,因为“checkpoint间的时间”不易受checkpoint到有时需要的比平均时间更长的事实影响(例如,如果目标存储系统暂时很慢)。

请注意,此值还表示并发checkpoint的数量为1。

  • 并发检查点数 :默认情况下,当一个checkpoint仍处于运行状态时,系统不会触发另一个检查点。这可确保拓扑不会在checkpoint上花费太多时间,也不会在处理流方面进展。它可以允许多个重叠检查点,这对于具有特定处理延迟的管道(例如,因为函数调用需要一些时间来响应的外部服务)而有兴趣,但是仍然希望执行非常频繁的checkpoint(100毫秒)在失败时重新处理很少。

当定义checkpoint间的最短时间时,不能使用此选项。

  • 外部化checkpoint :您可以配置定期checkpoint以在外部持久化。外部化checkpoint将其元数据写入持久存储,并且在作业失败时不会自动清除。这样,如果你的工作失败,你将有一个checkpoint来恢复。有关外部化checkpoint的部署说明中有更多详细信息。

  • checkpoint错误时的任务失败/继续 :这确定如果在执行任务的checkpoint过程中发生错误,任务是否将失败。这是默认行为。或者,当禁用此选项时,任务将简单地拒绝checkpoint协调器的checkpoint并继续运行。

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// 每 1000 毫秒启动一个检查点

env.enableCheckpointing(1000)

// 高级选项:

// 设置模式为 exactly-once (这也是默认的)

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 确保checkpoint之间发生500毫秒的处理

env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// checkpoint必须在1分钟内完成,否则被丢弃

env.getCheckpointConfig.setCheckpointTimeout(60000)

// 如果在checkpoint发生错误时防止任务失败,checkpoint将被拒绝。

env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// allow only one checkpoint to be in progress at the same time

env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

4.2.1 Related Config Options (相关配置项)

可以通过 conf/flink-conf.yaml 设置更多参数and/or默认值(参见完整指南的配置):

key 默认值 说明
state.backend (none) 用于存储和checkpoint状态的状态后端。
state.backend.async true 选择状态后端是否应在可能和可配置的情况下使用异步快照方法。某些状态后端可能不支持异步快照,或仅支持异步快照,时并忽略此选项。
state.backend.fs.memory-threshold 1024 状态数据文件的最小大小。小于该值的所有状态块都内联存储在根checkpoint元数据文件中。
state.backend.incremental false 如果可能,选择状态后端是否应创建增量checkpoint。对于增量checkpoint,仅存储来自先前检查点的差异,而不是完整的检查点状态。某些状态后端可能不支持增量checkpoint并忽略此选项。
state.backend.local-recovery false 此选项配置此状态后端的本地恢复。默认情况下,禁用本地恢复。本地恢复目前仅涵盖关键状态后端。目前,MemoryStateBackend不支持本地恢复并忽略此选项。
state.checkpoints.dir (none) 用于在Flink支持的文件系统中存储checkpoint的数据文件和元数据的默认目录。必须可以从所有参与的进程/节点(即所有TaskManagers和JobManagers)访问存储路径。
state.checkpoints.num-retained 1 要保留的已完成checkpoint的最大数量。
state.savepoints.dir (none) savepoint的默认目录。由将后端写入文件系统的状态后端(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)使用。
taskmanager.state.local.root-dirs (none) config参数定义根目录,用于存储基于文件的状态以进行本地恢复。本地恢复目前仅涵盖关键状态后端。目前,MemoryStateBackend不支持本地恢复并忽略此选项

4.3 Selecting a State Backend (选择状态后端)

Flink的checkpoint机制存储所有定时器和有状态operators中的状态一致快照,包括连接器、窗口和任何用户定义的状态。存储checkpoint的位置(例如,JobManager内存,文件系统,数据库)取决于配置的状态后端。

默认情况下,状态保存在TaskManagers的内存中,checkpoint存储在JobManager的内存中。为了适当持久化大状态,Flink支持在其他状态后端中存储和checkpoint状态的各种方法。可以通过 StreamExecutionEnvironment.setStateBackend(…) 配置状态后端的选择。

有关可用状态后端的详细信息以及作业范围和群集范围配置的选项,请参阅状态后端。

4.4 State Checkpoints in Iterative Jobs (迭代job中的状态Checkpoint)

Flink目前仅为没有iteration的作业提供处理保证。 在迭代作业上启用checkpoint会导致异常 。为了强制对迭代程序进行checkpoint,用户在启用checkpoint时需要设置一个特殊标志: env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)

请注意,在失败期间,循环边缘中的记录(以及与它们相关的状态变化)将丢失。

4.5 Restart Strategies (重启策略)

Flink支持不同的重启策略,可以控制在发生故障时如何重新启动作业。有关更多信息,请参阅重启策略。

觉得内容还不错的话,给我点个“在看”呗

识别二维码 交流学习

我是小晨丨北漂4年

一直在从事 大数据 相关工作

欢迎 添加好友 共同交流学习

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章