线性一致性实现原理剖析

背景

当我们讨论分布式系统时,通常会说到CAP理论,而这里的C一致性一般来说指的就是线性一致性(Linearizability),而对于开发者来说,可能更多的会去关注自己平常使用的一些中间件、类库,比如Etcd、Zookeeper等能够提供怎么的一致性,这两个类库我们知道是在Raft、Paxos(ZAB)的基础之上实现的,因此这篇文章我们就来看一下如果线性一致性实际是如何实现的。

实现

可能大多数人都有个误解,一个分布式系统,比如ZK、etcd等,如果正确的实现了共识算法(Raft/Paxos),那么它就能够提供强一致,这个理解其实是不正确的,Raft只能够保证不同节点对于raft日志达成一致,但对于库的使用者来说,实际上对外提供服务的是底层的状态机,比如说一个KV存储,每个raft日志记录的是实际的操作,比如 set a 1 set a 2 等,而如果只有日志的话,我们怎么查询呢?轮训一遍日志?显然不现实,因此必须将这个状态存起来,比如RocksDB,那么底层raft日志的一致性由raft本身去保证,而上层业务方状态机的一致性该如何去保证呢?

Raft是由leader驱动的共识算法,所有的写入请求都由leader来处理,并将日志同步到follower,然后再将日志依次应用的自身的状态机,比如RocksDB,但由于网络延迟、机器负载等原因,每个节点不可能同时将日志应用到RocksDB,因此对于不同的节点来说,RocksDB的数据快照肯定不是实时一致的,并且这里会涉及到很多的corner case,比如leader切换,也就是说leader的数据也不一定是最新的,因此实际实现的时候需要考虑好这些case。

这里暂不考虑异常情况,对于raft来说,写请求都是由leader处理并同步到follower,因此leader的数据通常是最新的,但如果用户发来一个读取请求,我们直接从状态机读取的话,这里其实是会读到过期数据的,因此这里分为两步,已提交的日志 -> 已经应用到状态机的日志,因此如果不做特殊处理的话,由于还有部分日志没有应用到状态机,直接处理的话必然会造成不一致。优化的方法大致有几种:

  • 读取也走raft log
  • Read Index
  • Lease Read

读取也走raft log

我们很容易想到,让读取的请求也走一遍raft流程,由于raft日志是全局严格有序的,读写也必然是有序的,因此当处理到读取的日志的时候,能够保证之前的写入请求都已经处理完成并落到状态机,因此这个时候处理读取请求是安全的,不会造成过期读的问题,也能够满足我们说的线性一致性,但这个方法有个很明显的缺点: 性能非常低,每次读取都走一遍raft流程,涉及到网络、磁盘IO等资源,而对于大部分场景来说,都是写少读多,因此如果不对读取进行优化的话,整个类库的性能会非常低效。

Read Index

Read index读的流程这里先简单说一下,当leader接收到一个读取请求时:

  • 将当前日志的commit index记录到一个本地变量readIndex中,封装到消息体中

  • 首先需要确认自己是否仍然是leader,因此需要向其他节点都发起一次心跳

  • 如果收到了大多数节点的心跳响应,那么说明该server仍然是leader身份

  • 在状态机执行的地方,判断下apply index是否超过了readIndex,如果超过了,那么就表明发起该读取请求时,所有之前的日志都已经处理完成,也就是说能够满足线性一致性读的要求

  • 从状态机去读取结果,返回给客户端

我们可以看到,和刚刚的方法相比,read index采用心跳的方式首先确认自己仍然是leader,然后等待状态机执行到了发起读取时所有日志,就可以安全的处理客户端请求了,这里虽然还有一次心跳的网络开销,但一方面心跳包本身非常小,另外处理心跳的逻辑非常简单,比如不需要日志落盘等,因此性能相对之前的方法会高非常多。

使用了Read Index的方法,我们还可以提供follower节点读取的功能,并可以在follower上实现线性一致性读,逻辑和leader有些差异:

  • Follower向leader查询最新的readIndex

  • leader会按照上面说的,走一遍流程,但会在确认了自己leader的身份之后,直接将readIndex返回给follower

  • Follower等待自己的状态机执行到了readIndex的位置之后,就可以安全的处理客户端的读请求

    接下来我们看看sofa-jraft是如何实现Read Index的

// 请求 ID 作为请求上下文传入
    final byte[] reqContext = new byte[4];
    Bits.putInt(reqContext, 0, requestId.incrementAndGet());
    // 调用 readIndex 方法,等待回调执行
    this.node.readIndex(reqContext, new ReadIndexClosure() {

        @Override
        public void run(Status status, long index, byte[] reqCtx) {
            if (status.isOk()) {
                //处理用户读请求
            } else {
                // 特定情况下,比如发生选举,该读请求将失败
                asyncContext.sendResponse(new BooleanCommand(false, status.getErrorMsg()));
            }
        }
    });

通过 Node#readIndex(byte [] requestContext, ReadIndexClosure done) 发起一次线性一致性读请求:

private ReadOnlyService                                                readOnlyService;


    @Override
    public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
        if (this.shutdownLatch != null) {
            Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
            throw new IllegalStateException("Node is shutting down");
        }
        Requires.requireNonNull(done, "Null closure");
        //异步执行,添加到ReadIndex队列中
        this.readOnlyService.addRequest(requestContext, done);
    }
public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndexListener {

    /** Disruptor to run readonly service. */
    private Disruptor<ReadIndexEvent>                  readIndexDisruptor;
    private RingBuffer<ReadIndexEvent>                 readIndexQueue;

 @Override
    public boolean init(final ReadOnlyServiceOptions opts) {
        this.node = opts.getNode();
        this.nodeMetrics = this.node.getNodeMetrics();
        this.fsmCaller = opts.getFsmCaller();
        this.raftOptions = opts.getRaftOptions();

        this.scheduledExecutorService = Executors
                .newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
        this.readIndexDisruptor = DisruptorBuilder.<ReadIndexEvent> newInstance() //
                .setEventFactory(new ReadIndexEventFactory()) //
                .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
                .setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) //
                .setWaitStrategy(new BlockingWaitStrategy()) //
                .setProducerType(ProducerType.MULTI) //
                .build();
        //消费者
        this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler());
        this.readIndexDisruptor
            .setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName()));
        this.readIndexQueue = this.readIndexDisruptor.start();
        if(this.nodeMetrics.getMetricRegistry() != null) {
            this.nodeMetrics.getMetricRegistry().register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
        }
        // listen on lastAppliedLogIndex change events.
        this.fsmCaller.addLastAppliedLogIndexListener(this);

        // start scanner
        this.scheduledExecutorService.scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
            this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
        return true;
    }


    @Override
    public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
        if (this.shutdownLatch != null) {
            //如果节点已关闭,直接返回失败
            Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
            throw new IllegalStateException("Service already shutdown.");
        }
        try {
            EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
                event.done = closure;//回调
                event.requestContext = new Bytes(reqCtx); //请求上下文
                event.startTime = Utils.monotonicMs();//记录当前时间戳
            };
            int retryTimes = 0;
            while (true) {
                //放到队列中
                if (this.readIndexQueue.tryPublishEvent(translator)) {
                    break;
                } else {
                   //如果失败了则重试,最大3次
                    retryTimes++;
                    if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
                        Utils.runClosureInThread(closure,
                            new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
                        this.nodeMetrics.recordTimes("read-index-overload-times", 1);
                        LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
                        return;
                    }
                    //休息一会,避免占用CPU
                    ThreadHelper.onSpinWait();
                }
            }
        } catch (final Exception e) {
            Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down."));
        }
    }
}

典型的生产者、消费者模型,底层队列采用disruptor的 RingBuffer , 这里主要是会合并ReadIndex请求,这里提下性能优化非常常用的一个手段:batch合并:

private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
        // task list for batch
        private final List<ReadIndexEvent> events = new ArrayList<>(
                                                      ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());

        @Override
        public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
                                                                                                         throws Exception {
            if (newEvent.shutdownLatch != null) {
                executeReadIndexEvents(this.events);
                this.events.clear();
                newEvent.shutdownLatch.countDown();
                return;
            }

            this.events.add(newEvent);
            //合并ReadIndex请求,默认32个
            if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
                executeReadIndexEvents(this.events);
                this.events.clear();
            }
        }
    }

   private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
        if (events.isEmpty()) {
            return;
        }
        //构造消息体
        final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
            .setGroupId(this.node.getGroupId()) //
            .setServerId(this.node.getServerId().toString());

        final List<ReadIndexState> states = new ArrayList<>(events.size());

        for (final ReadIndexEvent event : events) {
            rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
            states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
        }
        final ReadIndexRequest request = rb.build();
        
        this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
    }

可以看到,实际处理ReadIndex请求的是 Node#handleReadIndexRequest , 这里注意,会在上层进行合并,另外这里传入了一个 ReadIndexResponseClosure 回调,这个回调会在节点确认了自己leader的身份之后执行

/**
     * Handle read index request.
     */
    @Override
    public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
        final long startMs = Utils.monotonicMs();
        this.readLock.lock();
        try {
            switch (this.state) {
                //如果是leader的话,直接处理即可
                case STATE_LEADER:
                    readLeader(request, ReadIndexResponse.newBuilder(), done);
                    break;
                case STATE_FOLLOWER:
                    //如果该节点是follower的话,需要向leader查询readIndex
                    readFollower(request, done);
                    break;
                case STATE_TRANSFERRING:
                    //如果leader正在迁移,则直接返回
                    done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
                    break;
                default:
                    done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
                    break;
            }
        } finally {
            this.readLock.unlock();
            this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
            this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount());
        }
    }

//如果是follower的话,需要向leader查询readIndex
private void readFollower(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> closure) {
        if (this.leaderId == null || this.leaderId.isEmpty()) {
            closure.run(new Status(RaftError.EPERM, "No leader at term %d.", this.currTerm));
            return;
        }
        // send request to leader.
        final ReadIndexRequest newRequest = ReadIndexRequest.newBuilder() //
            .mergeFrom(request) //
            .setPeerId(this.leaderId.toString()) //
            .build();
        this.rpcService.readIndex(this.leaderId.getEndpoint(), newRequest, -1, closure);
    }

    private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
                            final RpcResponseClosure<ReadIndexResponse> closure) {
        final int quorum = getQuorum();
        if (quorum <= 1) {
            // Only one peer, fast path.
            respBuilder.setSuccess(true) //
                .setIndex(this.ballotBox.getLastCommittedIndex());
            closure.setResponse(respBuilder.build());
            closure.run(Status.OK());
            return;
        }
        
        //记录当前日志的commit index,即readIndex
        final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
        //leader刚启动时,需要先提交一条日志,确认自己的leader身份
        if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
            // Reject read only request when this leader has not committed any log entry at its term
            closure
                .run(new Status(
                    RaftError.EAGAIN,
                    "ReadIndex request rejected because leader has not committed any log entry at its term, logIndex=%d, currTerm=%d.",
                    lastCommittedIndex, this.currTerm));
            return;
        }
        respBuilder.setIndex(lastCommittedIndex);
        
        //如果该请求来自follower,需要确认下该follower是否仍然在该raft集群中
        if (request.getPeerId() != null) {
            // request from follower, check if the follower is in current conf.
            final PeerId peer = new PeerId();
            peer.parse(request.getServerId());
            if (!this.conf.contains(peer)) {
                closure
                    .run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: {}.", peer, this.conf));
                return;
            }
        }
        
        ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
        if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
            // If leader lease timeout, we must change option to ReadOnlySafe
            readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
        }

        switch (readOnlyOpt) {
            case ReadOnlySafe:
                final List<PeerId> peers = this.conf.getConf().getPeers();
                Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
                final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
                    respBuilder, quorum, peers.size());
                // Send heartbeat requests to followers
                for (final PeerId peer : peers) {
                    if (peer.equals(this.serverId)) {
                        continue;
                    }
                    //向所有其他的节点发送心跳包
                    this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
                }
                break;
            case ReadOnlyLeaseBased:
                // Responses to followers and local node.
                respBuilder.setSuccess(true);
                closure.setResponse(respBuilder.build());
                closure.run(Status.OK());
                break;
        }
    }

private class ReadIndexHeartbeatResponseClosure extends RpcResponseClosureAdapter<AppendEntriesResponse> {
        final ReadIndexResponse.Builder             respBuilder;
        final RpcResponseClosure<ReadIndexResponse> closure;
        final int                                   quorum;
        final int                                   failPeersThreshold;
        int                                         ackSuccess;
        int                                         ackFailures;
        boolean                                     isDone;

        public ReadIndexHeartbeatResponseClosure(final RpcResponseClosure<ReadIndexResponse> closure,
                                                 final ReadIndexResponse.Builder rb, final int quorum,
                                                 final int peersCount) {
            super();
            this.closure = closure;
            this.respBuilder = rb;
            this.quorum = quorum;
            this.failPeersThreshold = peersCount % 2 == 0 ? (quorum - 1) : quorum;
            this.ackSuccess = 0;
            this.ackFailures = 0;
            this.isDone = false;
        }

        @Override
        public synchronized void run(final Status status) {
            if (this.isDone) {
                return;
            }
            if (status.isOk() && getResponse().getSuccess()) {
                this.ackSuccess++;
            } else {
                this.ackFailures++;
            }
            // 如果收到了大多数节点的心跳包,那么说明该节点仍然是leader
            // 执行回调
            if (this.ackSuccess + 1 >= this.quorum) {
                this.respBuilder.setSuccess(true);
                this.closure.setResponse(this.respBuilder.build());
                this.closure.run(Status.OK());
                this.isDone = true;
            } else if (this.ackFailures >= this.failPeersThreshold) {
                this.respBuilder.setSuccess(false);
                this.closure.setResponse(this.respBuilder.build());
                this.closure.run(Status.OK());
                this.isDone = true;
            }
        }
    }

在确认了leader的身份之后,需要等待状态机执行到readIndex:

class ReadIndexResponseClosure extends RpcResponseClosureAdapter<ReadIndexResponse> {

        final List<ReadIndexState> states;
        final ReadIndexRequest     request;

        public ReadIndexResponseClosure(final List<ReadIndexState> states, final ReadIndexRequest request) {
            super();
            this.states = states;
            this.request = request;
        }

        /**
         * Called when ReadIndex response returns.
         */
        @Override
        public void run(final Status status) {
            if (!status.isOk()) {
                notifyFail(status);
                return;
            }
            final ReadIndexResponse readIndexResponse = getResponse();
            if (!readIndexResponse.getSuccess()) {
                notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
                return;
            }
            // Success
            final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
                readIndexResponse.getIndex());
            for (final ReadIndexState state : this.states) {
                // Records current commit log index.
                state.setIndex(readIndexResponse.getIndex());
            }

            boolean doUnlock = true;
            ReadOnlyServiceImpl.this.lock.lock();
            try {
	              //如果状态机已经执行到了readIndex,那么说明可以处理用户的请求了
                if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
                    // Already applied,notify readIndex request.
                    ReadOnlyServiceImpl.this.lock.unlock();
                    doUnlock = false;
                    notifySuccess(readIndexStatus);
                } else {
                    // 状态机还没有执行到readIndex,需要放到pending队列
                    ReadOnlyServiceImpl.this.pendingNotifyStatus
                    .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)).add(readIndexStatus);
                }
            } finally {
                if (doUnlock) {
                    ReadOnlyServiceImpl.this.lock.unlock();
                }
            }
        }

        private void notifyFail(final Status status) {
            final long nowMs = Utils.monotonicMs();
            for (final ReadIndexState state : this.states) {
                ReadOnlyServiceImpl.this.nodeMetrics.recordLatency("read-index", nowMs - state.getStartTimeMs());
                final ReadIndexClosure done = state.getDone();
                if (done != null) {
                    final Bytes reqCtx = state.getRequestContext();
                    done.run(status, ReadIndexClosure.INVALID_LOG_INDEX, reqCtx != null ? reqCtx.get() : null);
                }
            }
        }
    }

Lease Read

虽然 ReadIndex Read 比第一种方法快很多,但我们发现还是有一次心跳包的网络开销,raft论文里提到了一种更激进的方式,也就是leader其实都存在一个'在为期', 也就是说leader会向follower发送心跳包,当收到大多数节点的回复后即确认了leader身份,并记录下时间戳t1,如果读过raft论文我们会知道,follower会在Election Timeout超时后,才会认为leader挂掉,并发起一次新的选举,也就是说在下一次leader选举肯定发生在t1 + Election Timeout +/- 时钟偏移量之后,因此第二种方法的中,通过发送心跳包确认leader身份这一步就可以省略掉了,在Election Timeout内只需要确认一次即可,相对一种方式,不仅仅不需要写raft日志,连心跳包都省略掉了,可想而知,性能会大大提升,但这里存在一个潜在的问题是,服务器的时间可能会偏移,存在一定的误差,如果偏移过大的话,这种机制就不够安全。

Zookeeper的一致性保证

Zookeeper是基于ZAB(类Paxos)协议实现的,我们可能常常有一个误解,就是ZK能够提供强一致,但其实默认情况下ZK并不能提供全局一致的数据视图,或者说线性一致性,比如说有两个客户端A和B,如果客户端将一个节点a的值从0改成1,并告诉客户端B去读取a,由于客户端连接的zk服务器不同,可能读取到0,也可能读到1。但如果你需要更强的一致性,期望让A和B读取到同样的值,可以通过执行 Zookeeper#sync() 方法,并在回调中读取a的值。

也就是说默认情况下,zk并不能保证线性一致性读,也就是由于网络延迟、gc等原因,不同的节点接受到日志的时间不一致、应用到本地内存数据库的时间也有差异,但对于读取请求来说,每一个zk服务器都会对外提供服务,即使是follower也是一样,也就是说follower有一层本地缓存,这也是zk读取性能很高的一个原因,如果需要更高的读取性能,我们可以增加更多的follower节点,均衡读取的压力,但要注意增加更多的节点,也意味着写请求需要同步到更多的节点,也就是写入的性能会下降,这个需要业务方自己去权衡。 但如果业务方有线性一致性读的要求,可以通过sync调用,基本原理其实和read index差不多,来客户端连接的zk服务器本地的内存数据库追上最新的日志。

和读取不同,写入请求( set/delete )是满足线性一致性写的,也就是写入必须要走一次ZAB流程,但读取,由于每个客户端连接的zk服务节点不一致,而每个zk服务节点都会对外提供写服务,才会导致可能读到过期值。

总结

本文围绕raft以及zk的线性一致性实现细节以及相关的原理, 这里其实还涉及到很多的细节,后面会在单独的博客中补充。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章