ZooKeeper (四) 源码剖析:数据模型和存储

在前面几篇博客中,我总结了 ZooKeeper 的源码调试环境构建、集群群首选举、NIO 网络连接的博客,文章说实话写得比较乱,自己有些细节仍然是一知半解,或许写得不合理,对 ZooKeeper 有兴趣的同学可以在文章下留言,提提问题,大家一起讨论一下。这是我认为最适合学习源码的方法之一。

虽然不是很恰当,上图我自己对 ZooKeeper 的一些模块划分。Client 部分的代码有机会再分析,这篇文档把数据模型剖析清楚,应该算是在大体上把 ZooKeeper 的源码连接起来了,再去编写一个Watch模块(未来会分析),那自己去写一个简单的 ZooKeeper 应该也没问题。

前文连接:

PS: 本文对 ZooKeeper 的存储讨论都是基于单节点模型下的,集群模式下的 ZooKeeper 存储涉及到 Zab 一致性协议,后面有机会再讲。

ZK的存储:ZKDatabase、DataTree、FileTxnSnapLog

ZooKeeper 本质上是一个 K-V 存储中间件,这样基本上就必然会考虑数据的 增删查改 还有 持久化 的问题。 ZooKeeper 在API上是按照 树型 样式来设计的,在数据结构上也差不多, ZKDatabase 则是这个数据结构的抽象。

ZKDatabase

从注释上看, ZKDatabase 的作用有如下几个:

  • 维护内存数据库:指的是字段 DataTree
  • 维护通信 session(ServerCnxn) 和有 watch(监控相关),这部分其实是委托给 DataTree 去完成的

这个对象是随着 ZooKeeper 的 main 方法启动而启动的,启动后会通过 FileTxnSnapLog 读取日志,构建 DataTree。

/**
 * This class maintains the in memory database of zookeeper
 * server states that includes the sessions, datatree and the
 * committed logs. It is booted up  after reading the logs
 * and snapshots from the disk.
 */
public class ZKDatabase {
    // 存储数据结构
    protected DataTree dataTree;
    // 快照日志
    protected FileTxnSnapLog snapLog;
}

复制代码

DataTree

DataTree 是负责维护树形数据结构的类。我认为核心的字段和方法有以下几个:

public class DataTree {
    字段:
    1. NodeHashMap nodes; 全路径 -> DataNode 的 HashMap 映射
    2. dataWatches, childWatches 监视器
    3. PathTrie pTrie; 前缀树,提供树的数据结构,响应客户端的操作。遍历、查找之类的
    
    方法:
    1. serialize(OutputArchive oa, String tag) // 序列化
    2. deserialize(InputArchive ia, String tag) // 反序列化
    3. processTxn(TxnHeader header, Record txn) // 响应事务,对节点进行 增删改
}

复制代码

FileTxnSnapLog

FileTxnSnapLog 是用于保存 快照(Snap) 文件和 事务(Txn )文件的。所以这个类最终的是统筹 事务文件、快照文件 的操作。

public class FileTxnSnapLog {
    //the directory containing the
    //the transaction logs
    final File dataDir;     // 事务文件 句柄
    //the directory containing the
    //the snapshot directory
    final File snapDir;     // 快照文件 句柄
    TxnLog txnLog;      // 事务Log
    SnapShot snapLog;   // 快照Log
    
    
    method:
    // 反序列化,把数据还原到 DataTree 中,ZooKeeper 启动的时候调用
    public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener);
    
    // 把 DataTree 序列化到快照文件中。
    public void save(
        DataTree dataTree,
        ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
        boolean syncSnap) throws IOException
    
}
复制代码

完成了三个我认为 ZooKeeper 数据模型比较核心的类的介绍后,下面分析两个关于 ZooKeeper 数据存储的细节。

  • ZooKeeper 持久化策略及源码
  • 响应 Client 的事务请求

DataTree 的持久化策略

图片引用自: zhuanlan.zhihu.com/p/72176940

ZooKeeper 作为一个存储中间件,持久化是一定需要解决的。在这里,ZooKeeper 使用的策略分两部分:

  • 快照文件保存 Snapshot
  • 事务文件保存 Txnlog

保存的原理和 Redis 基本是一致的,两种文件的优缺点也一样。在恢复数据的时候,ZooKeeper 是先执行快照文件,然后再尝试把落后的事务数据通过 Txnlog 进行 回放 ,最终达到恢复最新的事务数据。

下面简单剖析一下 初始化ZK数据的源码修改ZK数据的源码

// ZKDatabase.java 
public long loadDataBase() throws IOException {
    long startTime = Time.currentElapsedTime();

    /**
     * snapLog 的类是包括: 快照文件+事务文件的,restore 方法是通过两个文件还原数据,并返回最新的 zxid
     */
    long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);    // 从此处切入
    initialized = true;
    long loadTime = Time.currentElapsedTime() - startTime;
    ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
    LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
            loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
    return zxid;
}

// FileTxnSnapLog.java 
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    long snapLoadingStartTime = Time.currentElapsedTime();

    // 通过快照文件 snapLog,反序列化 dataTree,并返回序列化的最大的 zxid 。
    // 具体的做法,就是遍历快照文件,生成树。(Step 1)
    long deserializeResult = snapLog.deserialize(dt, sessions); 
    ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    --- 省略代码
    File initFile = new File(dataDir.getParent(), "initialize");

    RestoreFinalizer finalizer = () -> {

        // 通过快照生成的 dataTree 有可能是落后与版本的,此时需要通过 事务日志 在快照的基础上,回放部分事务,并获得最新的数据
        // 具体的做法:根据当前dt的最大事务数据为起点,回放后面的事务 (Step 2)
        long highestZxid = fastForwardFromEdits(dt, sessions, listener);
        // The snapshotZxidDigest will reset after replaying the txn of the
        // zxid in the snapshotZxidDigest, if it's not reset to null after
        // restoring, it means either there are not enough txns to cover that
        // zxid or that txn is missing
        DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
        if (snapshotZxidDigest != null) {
            LOG.warn(
                    "Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
                            + "which might lead to inconsistent state",
                    Long.toHexString(highestZxid),
                    Long.toHexString(snapshotZxidDigest.getZxid()));
        }
        return highestZxid;     // 返回正在的,最大的 zxId
    };
    --- 省略代码
    if (-1L == deserializeResult) {
        // 找不到数据库,就尝试重新建造一个数据库
        /* this means that we couldn't find any snapshot, so we need to
         * initialize an empty database (reported in ZOOKEEPER-2325) */
        if (txnLog.getLastLoggedZxid() != -1) {
            // ZOOKEEPER-3056: provides an escape hatch for users upgrading
            // from old versions of zookeeper (3.4.x, pre 3.5.3).
            if (!trustEmptySnapshot) {
                throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
            } else {
                LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
                return finalizer.run();
            }
        }
        --- 省略代码
        return finalizer.run();     // 执行回放,并返回最大的 zxid
    }
复制代码

Client 的事务请求

以上为初始化 ZooKeeper 数据库的核心逻辑。由于篇幅原因,Step1 和 Step2 的实现不进一步剖析了。下面分析一下这个 ZKDatabase 如何处理客户端的事务 Request。

这里先说一下 ZooKeeper 的处理逻辑:

  1. 在网络层接受到 Request 后,通过 SyncRequestProcessor 把 Request 写入事务日志中
  2. SyncRequestProcessor 根据 数据统计,判断是否需要把 DataTree 写一个快照
  3. FinalRequestProccessor 最后获得这个 Request 后,会对这个 Request 执行响应处理,修改 DataTree 数据

现在看看代码是怎么处理的:

SyncRequestProcessor.java 是一个线程,因此逻辑都在 run 方法中。 // 方法已经被精简了,源码会更加复杂一点
@Override
public void run() {
    // 这个方法主要是 1、写事务日志;2、尝试写快照
    try {
        // we do this in an attempt to ensure that not all of the servers
        // in the ensemble take a snapshot at the same time
        resetSnapshotStats();
        lastFlushTime = Time.currentElapsedTime();
        while (true) {
            // 从队列中取出 Request
            Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
            if (si == null) {
                /* We timed out looking for more writes to batch, go ahead and flush immediately */
                flush();    // 假如取不到数据,就可以尝试进行flush,把数据都写到 DataTree 中去 从此处切入!
                si = queuedRequests.take();
            }
            // track the number of records written to the log
            if (!si.isThrottled() && zks.getZKDatabase().append(si)) {  // append 一个 Request,只是写事务日志
                if (shouldSnapshot()) {
                        // 临时起一个线程,对 DataTree 进行快照写盘
                        new ZooKeeperThread("Snapshot Thread") {
                            public void run() {
                                try {
                                    zks.takeSnapshot();
                                } catch (Exception e) {
                                    LOG.warn("Unexpected exception", e);
                                } finally {
                                    snapThreadMutex.release();
                                }
                            }
                        }.start();
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        }
        LOG.info("SyncRequestProcessor exited!");
    }
}

private void flush() throws IOException, RequestProcessorException {
    if (this.toFlush.isEmpty()) {
        return;
    }

    ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());

    long flushStartTime = Time.currentElapsedTime();
    zks.getZKDatabase().commit();
    ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);

    if (this.nextProcessor == null) {
        this.toFlush.clear();
    } else {
        while (!this.toFlush.isEmpty()) {
            final Request i = this.toFlush.remove();
            long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
            ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
            // 这里切入,Processor 的调用链传递 Requets 参数,响应 Requegt 请求,把数据同步到 DataTree 中。
            // 此处代码比较深入,整体逻辑是吧 Request i 的操作写到 DataTree 中
            this.nextProcessor.processRequest(i);      
        }
        if (this.nextProcessor instanceof Flushable) {
            ((Flushable) this.nextProcessor).flush();
        }
    }
    lastFlushTime = Time.currentElapsedTime();
}
复制代码

RequestProcessor 的特点

网络层接收到包后,进行一次请求封装,生成 Request 对象,交给 RequestProccessor 调用链进行处理。具体的做法是 Request 对象投入 RequestProccessor 的请求处理队列中,然后 RequestProccessor 自动消费并传递。

在这里,调用链的构建比较有意思,是通过 Wrapper 层层包装的。(Dubbo SPI 的 Wrapper 也是采用类似的做法)

// ZooKeeperServer.java ,构建 RequestProcessors 责任链
protected void setupRequestProcessors() {
    // 通过代理模式层层封装 Request 处理器
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);      // 最后执行的放在里面
    RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
    ((SyncRequestProcessor) syncProcessor).start();     // ZooKeeper 单线程 处理事务
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);         // 最前执行的放在外面
    ((PrepRequestProcessor) firstProcessor).start();    // 单线程处理事务,处理完后,提交到下一个 Processor 的 Request 队列里,下一个继续消费
}
复制代码

另外,SyncRequestProcessor 类和 PrepRequestProcessor 类都是 Thread,并且全局只有一个,这样是了保证处理模型是单线程的,这样有两个好处:

  1. 让 ZKDatabase 保持简单,不用考虑并发问题;
  2. 减少线程切换的成本;

同时这里也反应出一个问题,为了保证存储的持久性,每个事务 Request 都需要写到事务文件才返回,这也意味着,ZooKeeper 是单线程同步写文件的,所以它的写性能就相当差,不适合做常规的存储服务。当然,它的定位是:分布式协调服务,这样的设计也是没问题的。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章