Flink 源码解读:TM 端恢复及创建 OperatorState 的流程

在之前《 StreamTask 初始化流程 》的文章中,省略掉了 TM 端恢复 State 的详细过程,本文主要讲述:

  • OperatorState 的恢复和创建流程

  • Checkpoint 处恢复的 State 如何与代码中创建的 State 关联起来

一、 TM 端恢复 OperatorState 的流程

StateBackend 创建 OperatorStateBackend 时 TM 端会恢复 OperatorState。目前 Flink 支持的三种 StateBackend 都对应同一种 OperatorStateBackend,即:DefaultOperatorStateBackend,具体  new DefaultOperatorStateBackend 的过程由建造器 DefaultOperatorStateBackendBuilder 完成。

三种 StateBackend 的 createOperatorStateBackend 方法非常相似,源码如下:

public OperatorStateBackend createOperatorStateBackend(
 Environment env,
 String operatorIdentifier,
 @Nonnull Collection<OperatorStateHandle> stateHandles,
 CloseableRegistry cancelStreamRegistry) throws Exception {

 return new DefaultOperatorStateBackendBuilder(
  env.getUserClassLoader(),
  env.getExecutionConfig(),
  isUsingAsynchronousSnapshots(),
  stateHandles,
  cancelStreamRegistry).build();
}

所有的初始化流程都在 DefaultOperatorStateBackendBuilder 类的 build 方法中,build 方法源码如下所示:

@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
 AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
  new DefaultOperatorStateBackendSnapshotStrategy(XXX);
 OperatorStateRestoreOperation restoreOperation = 
    new OperatorStateRestoreOperation(XXX);
 try {
  // OperatorState 恢复流程
  restoreOperation.restore();
 } catch (Exception e) {
  IOUtils.closeQuietly(cancelStreamRegistryForBackend);
  throw new BackendBuildingException("XXX", e);
 }
 return new DefaultOperatorStateBackend(XXX);
}

build 方法中除了构造了几个对象以外,重点执行了 OperatorStateRestoreOperation 的 restore 方法,restore 方法就是恢复流程。

先介绍 OperatorStateRestoreOperation 类中两个重要的 Map:

  • registeredOperatorStates 用于保存 StateName 和 ListState 的映射关系;

  • registeredBroadcastStates 用于保存 StateName 和 BroadcastState 的映射关系

restore 源码如下所示:

// OperatorStateRestoreOperation 类中两个重要的 Map
// 保存 StateName 和 ListState 的映射关系
private final Map<String, PartitionableListState<?>> registeredOperatorStates;

// 保存 StateName 和 BroadcastState 的映射关系
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;

// Operator State 真正的 restore 流程
@Override
public Void restore() throws Exception {
  // stateHandles 为空,表示没有要恢复的 State
 if (stateHandles.isEmpty()) {
  return null;
 }

 // 遍历所有 stateHandles
 for (OperatorStateHandle stateHandle : stateHandles) {
  // 通过 stateHandle 可以获取 InputStream 读取数据
  FSDataInputStream in = stateHandle.openInputStream();
  try {
   List<StateMetaInfoSnapshot> restoredOperatorMetaInfoSnapshots =
    backendSerializationProxy.getOperatorStateMetaInfoSnapshots();

   // 从元数据中创建 PartitionableListStates,并没有恢复真正的 State
   for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
    final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
     new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
        
    // registeredOperatorStates 中维护的 StateName 与 ListState 的映射关系
    PartitionableListState<?> listState = registeredOperatorStates
          .get(restoredSnapshot.getName());
        
    // listState == null 表示当前 State 还未创建,则创建,并保存到 map 中
    if (null == listState) {
     // 这里只是依赖 MetaInfo 创建了 PartitionableListState,并没有恢复真正的 State 数据
     listState = new PartitionableListState<>(restoredMetaInfo);
     // 创建出的 State 数据 put 到 registeredOperatorStates 中
     registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
    }
   }

   // 真正恢复 State 的操作
   for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
    stateHandle.getStateNameToPartitionOffsets().entrySet()) {
    final String stateName = nameToOffsets.getKey();
        // 通过 StateName 从 registeredOperatorStates 中获取 ListState
        // 因为之前已经根据元数据创建了 State,
        // 所以这里 get 不到,只能是因为当前的 StateName 属于 BroadcastState
    PartitionableListState<?> listStateForName = 
            registeredOperatorStates.get(stateName);
        
    // listState 为 null,表示恢复 Broadcast 相关的 State
    if (listStateForName == null) {
     BackendWritableBroadcastState<?, ?> broadcastStateForName = 
            registeredBroadcastStates.get(stateName);
     deserializeBroadcastStateValues(broadcastStateForName, 
                                          in, nameToOffsets.getValue());
    } else {
     // 恢复 ListState,将恢复出来的元素 add 到 ListState 中
     deserializeOperatorStateValues(listStateForName, 
                                         in, nameToOffsets.getValue());
    }
   }
  } finally {
   Thread.currentThread().setContextClassLoader(restoreClassLoader);
   if (closeStreamOnCancelRegistry.unregisterCloseable(in)) {
    IOUtils.closeQuietly(in);
   }
  }
 }
 return null;
}

restore 方法中拿到的就是 JM 分配给当前 subtask 的 stateHandles,如果 stateHandles 为空表示没有要恢复的 State 则直接返回 null,可能是因为任务是直接启动,而不是从 Checkpoint 处恢复。否则 stateHandles 不为空的情况,就遍历一个个 OperatorStateHandle,通过 stateHandle 可以获取 InputStream 读取数据。

首先读出元数据,用于创建 PartitionableListState,并没有真正恢复 State 数据,PartitionableListState 是 OperatorState 对 ListState 的具体实现。ListState 维护在 registeredOperatorStates 这个 Map 中,通过 StateName 从 registeredOperatorStates 中 get,get 不到时,通过元数据创建 State,并存放在 registeredOperatorStates 中。

代码中省略了 BroadcastState 的创建流程,整体流程与 ListState 流程类似,只不过 BroadcastState 维护在 registeredBroadcastStates 中。

最后真正的恢复 State 数据,对于 ListState 而言将恢复出来的元素 add 到 ListState 中。恢复 State 数据的过程其实用反序列化器对状态数据反序列化生成对象的过程。反序列化器维护在 PartitionableListState 的元数据中。

到这里 OperatorState 就恢复完成,此时映射关系已经保存到 OperatorStateRestoreOperation 类的两个 Map 集合中。现在又回到 DefaultOperatorStateBackendBuilder 类的 build 方法中,就会发现其实这两个 Map 是好多地方共享的。这里再贴一下 build 方法的完整源码,重点关注两个 Map:

@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
 // 保存 StateName 和 ListState 的映射关系
 Map<String, PartitionableListState<?>> registeredOperatorStates = new HashMap<>();
 // 保存 StateName 和 BroadcastState 的映射关系
 Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates = 
    new HashMap<>();

 CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
  
 OperatorStateRestoreOperation restoreOperation = new OperatorStateRestoreOperation(
  cancelStreamRegistry,
  userClassloader,
  // 将两个 Map 传递进去,即:restore 过程中,映射关系会存储在这两个 Map 中
  registeredOperatorStates,
  registeredBroadcastStates,
  restoreStateHandles
 );
 try {
  // OperatorState 恢复流程
  restoreOperation.restore();
 } catch (Exception e) {
  IOUtils.closeQuietly(cancelStreamRegistryForBackend);
  throw new BackendBuildingException("XXX", e);
 }
 AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
  new DefaultOperatorStateBackendSnapshotStrategy(
   userClassloader,
   asynchronousSnapshots,
   // 将两个 Map 传递给 DefaultOperatorStateBackendSnapshotStrategy
   registeredOperatorStates,
   registeredBroadcastStates,
   cancelStreamRegistryForBackend);
  
 return new DefaultOperatorStateBackend(
  executionConfig,
  cancelStreamRegistryForBackend,
  // 再将两个 Map 传递给 DefaultOperatorStateBackend
  registeredOperatorStates,
  registeredBroadcastStates,
  new HashMap<>(),
  new HashMap<>(),
  snapshotStrategy
 );
}

可以看到 build 方法刚开始会 new 两个 Map,然后传递给了 OperatorStateRestoreOperation,之后 OperatorStateRestoreOperation 的 restore 流程(也就是上述分析的恢复流程)实际上将 Checkpoint 中恢复出来的映射关系保存到了这两个 Map 中。之后两个 Map 又传递给了 DefaultOperatorStateBackendSnapshotStrategy 和 DefaultOperatorStateBackend。

所以得出结论:DefaultOperatorStateBackend 中持有从 Checkpoint 处恢复出来的 StateName 与具体 State 的映射关系。

到这里 DefaultOperatorStateBackend 就创建完成了,同时留两个问题:

  • 上面流程虽然将 OperatorState 从 Checkpoint 中恢复了,但用户在算子中创建的 State 如何与 Checkpoint 中恢复的 OperatorState 关联起来呢?

  • 另外对于直接启动,不从 Checkpoint 处恢复的任务,OperatorState 又是如何创建出来的?

带着这两个问题阅读下面流程。

二、 用户定义的 OperatorState 创建流程

Flink 源码中最典型的使用 OperatorState 的场景就是 FlinkConsumer 使用 ListState 去维护 Kafka 的 offset 信息,所以本文就从这块源码入手,看一下这个 ListState 创建流程。

FlinkKafkaConsumerBase 类的 initializeState 方法中用到了 getUnionListState 创建一个 ListState,简洁版源码如下所示:

@Override
public final void initializeState(FunctionInitializationContext context) {
 OperatorStateStore stateStore = context.getOperatorStateStore();
 unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
   OFFSETS_STATE_NAME,
   TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
}

这里调用的 OperatorStateStore 的 getUnionListState 方法。OperatorStateStore 是个接口,它只有一个实现类,就是前面创建出来的 DefaultOperatorStateBackend。所以这里会调用 DefaultOperatorStateBackend 类的 getUnionListState 方法。不过 DefaultOperatorStateBackend 中还有一个 getListState(ListStateDescriptor stateDescriptor) 方法,这也就是 OperatorState 类型的 ListState 两种获取方式。可以看一下这两个方法的源码:

@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) {
 return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
}

@Override
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) {
 return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
}

源码中可以看到,无论业务使用的是 getListState 还是 getUnionListState 方法获取 ListState ,最后都会调用同一个方法,即:getListState(ListStateDescriptor stateDescriptor, OperatorStateHandle.Mode mode)。加了一个参数 OperatorStateHandle.Mode 用于区分 OperatorState 的模式:

  • getListState 对应 SPLIT_DISTRIBUTE 模式

  • getUnionListState 对应 UNION 模式

getListState(stateDescriptor, mode) 方法源码如下所示:

// 无论是 getListState 还是 getUnionListState 方法都会调用这里,
// 只不过传递的 Mode 参数不同而已
private <S> ListState<S> getListState(
  ListStateDescriptor<S> stateDescriptor,
  OperatorStateHandle.Mode mode) throws StateMigrationException {

 String name = Preconditions.checkNotNull(stateDescriptor.getName());

 TypeSerializer<S> partitionStateSerializer = 
    Preconditions.checkNotNull(stateDescriptor.getElementSerializer());

 PartitionableListState<S> partitionableListState = (PartitionableListState<S>) 
    registeredOperatorStates.get(name);

 // registeredOperatorStates 中维护的是 Checkpoint 中恢复的 StateName 和 ListState 的映射关系
 // 如果 partitionableListState == null 表示从 Checkpoint 中没有恢复出这个 State,
 // 即:这是一个新的 State,则新建一个 PartitionableListState,并维护在 Map 中
 if (null == partitionableListState) {
  partitionableListState = new PartitionableListState<>(
   new RegisteredOperatorStateBackendMetaInfo<>(
    name,
    partitionStateSerializer,
    mode));

  registeredOperatorStates.put(name, partitionableListState);
 } else {
  // State 已经从 Checkpoint 中恢复了,检查兼容性问题
  // 这里会检查 StateName 和 AssignmentMode 是否可以匹配
  checkStateNameAndMode(
    partitionableListState.getStateMetaInfo().getName(),
    name,
    partitionableListState.getStateMetaInfo().getAssignmentMode(),
    mode);

  RegisteredOperatorStateBackendMetaInfo<S> restoredPartitionableListStateMetaInfo =
   partitionableListState.getStateMetaInfo();

  // 检查 序列化是否兼容
  TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();

  TypeSerializerSchemaCompatibility<S> stateCompatibility =
   restoredPartitionableListStateMetaInfo.
      updatePartitionStateSerializer(newPartitionStateSerializer);
  //  不兼容,则抛出异常
  if (stateCompatibility.isIncompatible()) {
   throw new StateMigrationException("XXX.");
  }
  partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
 }

 accessedStatesByName.put(name, partitionableListState);
 // 返回 State
 return partitionableListState;
}

getListState(stateDescriptor, mode) 方法首先通过 name 从 registeredOperatorStates 中 get 对应的 ListState 保存到 partitionableListState 中,registeredOperatorStates 维护的是 Checkpoint 中恢复的 StateName 和 ListState 的映射关系。所以 partitionableListState == null 表示从 Checkpoint 中没有恢复出这个 State,即:这是一个新的 State,所以新建一个 PartitionableListState,并保存在 registeredOperatorStates 中。

反之,partitionableListState != null 表示 State 已经从 Checkpoint 中恢复了,开始检查兼容性,首先会检查 Checkpoint 中恢复的 State 和用户新申请的 StateName 和 AssignmentMode 是否可以匹配。

  • StateName 和 name 肯定是匹配的,因为 partitionableListState 是根据 name get 出来的。

  • AssignmentMode 枚举用于区分应用层使用的 getListState 恢复还是 getUnionListState 恢复,getListState 表示 SPLIT_DISTRIBUTE 模式,getUnionListState 表示 UNION 模式。如果 State 中存储的是 SPLIT_DISTRIBUTE 模式,但任务恢复时,代码改成了 getUnionListState,实际上 State 不能正常恢复的。

StateName 和 AssignmentMode 检查完毕后,会检查序列化是否兼容,不兼容,则抛出异常。兼容则会返回 State。

上述流程就回答了最开始提的两个问题:

  1. OperatorState 从 Checkpoint 中恢复后,用户在算子中创建的 State 如何与 Checkpoint 中恢复的 OperatorState 关联起来呢?

    答:依赖 registeredOperatorStates 这个 Map 维护了 StateName 和 ListState 的映射关系,用户创建 State 是通过 StateName 从 registeredOperatorStates 中查找,如果能找到,对其进行兼容性检查,检查通过就会返回从 Checkpoint 中恢复的 ListState,从而完成了关联。

  2. 对于直接启动,不从 Checkpoint 处恢复的任务,OperatorState 又是如何创建出来的?

    答:对于直接启动的任务,registeredOperatorStates 肯定是空的。创建 State 时,从 registeredOperatorStates 中 get 不到,所以就创建一个新的 PartitionableListState,并保存在 registeredOperatorStates 中。

到这里,OperatorState 就完成了恢复,且用户的 State 也正常的创建出来了。

三、总结

文中首先介绍了 OperatorState 的恢复和创建流程,并介绍从 Checkpoint 处恢复的 State 如何与代码中创建的 State 关联起来的。后续将会详细介绍 KeyedState 的恢复创建流程以及如何将 Checkpoint 处恢复的 State 如何与代码中创建的 State 关联起来。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章