Checkpoint对齐机制源码分析

点击箭头处 “蓝色字” ,关注我们哦!!

checkpoint是保证Flink状态容错的重要机制,通过checkpoint可以实现不同的数据语义,也就是我们所说的Exactly-Once与At-Least-Once,通过不同的checkpoint机制实现不同的数据语义,这里所说的机制表示的是checkpoint对齐机制:对齐,实现 Exactly-Once 语义,不对齐,实现 At- Least -Once 语义。

官方文档解释:

对齐通常发生在需要接受上游多个输入流的操作中,例如keyBy、join等操作,接下来将会从源码角度分析对齐机制的实现。

checkpoint机制的处理发生在StreamInputProcessor/StreamTwoInputProcessor中,该类主要负责从远端读取数据然后交给StreamOperator处理,数据读取由CheckpointBarrierHandler完成,同时也负责对齐机制的处理,由getNextNonBlocked方法完成,该接口有两个不同的实现类BarrierBuffer与BarrierTracker:

//在StreamInputProcessor/StreamTwoInputProcessor 中创建CheckpointBarrierHandler

//被调用

public static CheckpointBarrierHandler createCheckpointBarrierHandler(

StreamTask<?, ?> checkpointedTask,

CheckpointingMode checkpointMode,

IOManager ioManager,

InputGate inputGate,

Configuration taskManagerConfig) throws IOException {


CheckpointBarrierHandler barrierHandler;

if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {

long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);

if (!(maxAlign == -1 || maxAlign > 0)) {

throw new IllegalConfigurationException(

TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()

+ " must be positive or -1 (infinite)");

}

if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {

barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);

} else {

barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);

}

} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {

barrierHandler = new BarrierTracker(inputGate);

} else {

throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);

}


if (checkpointedTask != null) {

barrierHandler.registerCheckpointEventHandler(checkpointedTask);

}

return barrierHandler;

}

由此可见 BarrierBuffer 用来实现对齐机制, BarrierTracker 用来实现非对齐机制。

对齐- BarrierBuffer

BarrierBuffer 包含了对齐使用的几个重要的成员变量:BufferBlocker类型的bufferBlocker、boolean类型数组的blockedChannels , Buffer Blocker 内部包含一个ArraryDeque的队列,用于缓存对齐时的数据, blockedChannels 用于判断通道是否处于对齐状态中。

对齐流程方法:

@Override

public BufferOrEvent getNextNonBlocked() throws Exception {

while (true) {


//.....

BufferOrEvent bufferOrEvent = next.get();

if (isBlocked(bufferOrEvent.getChannelIndex())) {

//当前获取数据channel处于对齐状态中则将数据添加到缓存中

//也就是 BufferBlocker中

bufferBlocker.add(bufferOrEvent);

checkSizeLimit();

}

else if (bufferOrEvent.isBuffer()) {

//buffer 则直接返回

return bufferOrEvent;

}

else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {

if (!endOfStream) {

// 处理CheckpointBarrier 类型的数据

processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());

}

}

//.......

}

}

processBarrier方法:

private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {

//barrierId表示当前批次的checkpointId

final long barrierId = receivedBarrier.getId();

// 如果是单输入流 则直接触发checkpoint

if (totalNumberOfInputChannels == 1) {

if (barrierId > currentCheckpointId) {

// new checkpoint

currentCheckpointId = barrierId;

notifyCheckpoint(receivedBarrier);

}

return;

}

//多输入流的处理,numBarriersReceived表示已接收到的

//当前批次checkpointId 的channel 个数

//numBarriersReceived >0 表示正在对齐过程中

if (numBarriersReceived > 0) {

// this is only true if some alignment is already progress and was not canceled

if (barrierId == currentCheckpointId) {

// regular case

onBarrier(channelIndex);

}

else if (barrierId > currentCheckpointId) {

// 如果到来的barrierId也就是checkpointId 大于当前正在

//发生对齐机制的checkpointId ,那么会取消当前的checkpoint(比喻说超时导致)

// 并且重置blockedChannels状态 重置numBarriersReceived为0

//然后开启下一次(barrierId) checkpoint对齐机制

LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +

"Skipping current checkpoint.",

inputGate.getOwningTaskName(),

barrierId,

currentCheckpointId);


notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));

releaseBlocksAndResetBarriers();

beginNewAlignment(barrierId, channelIndex);

}

else {

// ignore trailing barrier from an earlier checkpoint (obsolete now)

return;

}

}

else if (barrierId > currentCheckpointId) {

//numBarriersReceived==0 开启一次新的chechpoint

//将对应的blockedChannels置为阻塞状态true

beginNewAlignment(barrierId, channelIndex);

}

else {

// either the current checkpoint was canceled (numBarriers == 0) or

// this barrier is from an old subsumed checkpoint

return;

}


// check if we have all barriers - since canceled checkpoints always have zero barriers

// this can only happen on a non canceled checkpoint

if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {

// actually trigger checkpoint

if (LOG.isDebugEnabled()) {

LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",

inputGate.getOwningTaskName(),

receivedBarrier.getId(),

receivedBarrier.getTimestamp());

}

//对齐完成 将缓存的数据(BufferBlocker中的数据)插入到消费队列中

//被消费 ,然后触发checkpoint

releaseBlocksAndResetBarriers();

notifyCheckpoint(receivedBarrier);

}

}


对齐总体流程:在接受上游多个输入情况,当从一个输入中接受到checkpointBarrier时,会暂时将该输入channel 置为阻塞状态,并且将后续从该channel读取到的数据暂存在缓存中,当后续所有channel的checkpointBarrier都达到后,将会处理缓存中的数据,并且开始checkpoint。

非对齐- BarrierTracker

对于非对齐机制相对来说就比较简单,不会发生数据缓存,当所有的channel的 checkpointB arrier 达到就开始执行checkpoint。

public BufferOrEvent getNextNonBlocked() throws Exception {

while (true) {

Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();

if (!next.isPresent()) {

// buffer or input exhausted

return null;

}


BufferOrEvent bufferOrEvent = next.get();

if (bufferOrEvent.isBuffer()) {

return bufferOrEvent;

}

else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {

processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());

}

else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {

processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());

}

else {

// some other event

return bufferOrEvent;

}

}

}

processBarrier方法:

private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {

final long barrierId = receivedBarrier.getId();

// 如果只有一个输入则直接触发checkpoint

if (totalNumberOfInputChannels == 1) {

notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());

return;

}


// general path for multiple input channels

if (LOG.isDebugEnabled()) {

LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);

}


// find the checkpoint barrier in the queue of pending barriers

CheckpointBarrierCount cbc = null;

int pos = 0;

//寻找同一批次的checkpoint

for (CheckpointBarrierCount next : pendingCheckpoints) {

if (next.checkpointId == barrierId) {

cbc = next;

break;

}

pos++;

}


if (cbc != null) {

// add one to the count to that barrier and check for completion

int numBarriersNew = cbc.incrementBarrierCount();

if (numBarriersNew == totalNumberOfInputChannels) {

// 集齐七龙珠 可以触发checkpoint了

for (int i = 0; i <= pos; i++) {

pendingCheckpoints.pollFirst();

}


// notify the listener

if (!cbc.isAborted()) {

if (LOG.isDebugEnabled()) {

LOG.debug("Received all barriers for checkpoint {}", barrierId);

}


notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());

}

}

}

else {

// 新的开始了

if (barrierId > latestPendingCheckpointID) {

latestPendingCheckpointID = barrierId;

pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));


// make sure we do not track too many checkpoints

if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {

pendingCheckpoints.pollFirst();

}

}

}

}

非对齐总体流程:在接受上游多个输入情况下,每一个批次的checkpoint不会发生数据缓存,会直接交给下游去处理,checkpoint信息会被缓存在一个 CheckpointBarrierCount 类型的队列中,CheckpointBarrierCount标识了一次checkpoint与其channel输入checkpointBarrier个数,当 checkpointB arri e r个数 与channel个数相同则会触发checkpoint。

关注回复 Flink

获取更多系列

原创不易,好看,就点个"在看"

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章