Zookeeper Watcher 流程分析(结合源码)

概述

ZK提供了分布式数据的 发布/订阅 功能,一个典型的发布/订阅模型系统定义了一种 一对多 的订阅关系,能够让多个订阅者同时监听某个主题对象,当这个主题对象自身状态发生变化时,会通知所有的订阅者。在ZK中引入了 Watcher 机制来实现这种 分布式的通知功能

ZK允许客户端向服务器端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher ,那么就会向指定客户端发送一个事件通知来实现分布式通知功能。

大致流程就是 Client 向ZK中注册 Watcher ,如果注册成功的话,会将对应的 Watcher 存储在本地。当ZK服务器端触发 Watcher 事件之后,会向客户端发送通知,客户端会从 ClientWatchManager 中取出对应的 Watcher 进行回调。

Watcher 接口

说了那么久、 Watcher 究竟是啥?有什么用处?

/**
 * This interface specifies the public interface an event handler class must
 * implement. A ZooKeeper client will get various events from the ZooKeeper
 * server it connects to. An application using such a client handles these
 * events by registering a callback object with the client. The callback object
 * is expected to be an instance of a class that implements Watcher interface.
 */
@InterfaceAudience.Public
public interface Watcher {
    void process(WatchedEvent event);
}
复制代码

只要你通过这个接口的实现类对象去向ZK服务端注册监听,那么当有ZK服务端有事件通知到Client,那么就会回调这个 process 方法。

WatchedEvent

那么 WatchedEvent 又有什么玄机呢?

public class WatchedEvent {
   /**
    * Enumeration of states the ZooKeeper may be at the event
    */
    private final KeeperState keeperState;
   /**
    * Enumeration of types of events that may occur on the ZooKeeper
    */
    private final EventType eventType;
    private String path;
}
复制代码

KeeperState和 EventType 是两个枚举类,分别代表通知状态和事件类型。 path 就是 client 监听到路径。

常见的 KeeperStateEventType 组合

KeeperState EventType 触发条件 说明
SyncConnected None(-1) 客户端与服务端成功建立会话 客户端和服务端处于连接状态
SyncConnected NodeCreated(1) Watcher 监听对应的数据节点被创建 客户端和服务端处于连接状态
SyncConnected NodeDeleted(2) Watcher 监听对应的数据节点被删除 客户端和服务端处于连接状态
SyncConnected NodeDataChanged(3) Watcher 监听对应的数据节点的内容发生变更( 数据内容和数据版本号 ) 客户端和服务端处于连接状态
SyncConnected NodeChildrenChanged(4) Watcher 监听对应的数据节点的 子节点列表 发生改变 客户端和服务端处于连接状态

关于 NodeDataChanged 事件类型,这里的变更包括节点的数据内容发生变更,也包括数据的版本号( dataVersion ) 变更,所以只要有客户端调用了数据更新接口,不管数据内容是否发生改变、都会导致 dataVersion 发生改变,从而触发对应 Watcher 的监听。这样子就能避免典型乐观锁 ABA 的问题。

WatcherEvent

我们可以在 WatchedEvent 中发现有这么一个方法

/**
     *  Convert WatchedEvent to type that can be sent over network
     */
    public WatcherEvent getWrapper() {
        return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
    }
复制代码

笼统的说, WatcherEventWatchedEvent 表示的是同一个事物,都是对服务端事件的封装。 WatchedEvent 是一个用于逻辑处理的对象、而 WatcherEvent 是用于传输的实体对象。从上面的代码我们可以看到,创建 WatcherEvent 的参数就是 WatchedEvent 中各个属性的值。

people.apache.org/~larsgeorge… 中可以看到它实现了 Record 接口

public class WatcherEvent
extends Object
implements org.apache.jute.Record
复制代码

而在 Record 接口中定义了序列化和反序列的方法

@InterfaceAudience.Public
public interface Record {
    void serialize(OutputArchive archive, String tag) throws IOException;
    void deserialize(InputArchive archive, String tag) throws IOException;
}
复制代码

相关组件

相关过程

概括可以分为三个过程

  • 客户端注册 Watcher

  • 服务端处理 Watcher

  • 客户端回调 Watcher

客户端注册 Watcher

我们在创建一个ZK 客户端实例对象的时候、可以向构造方法中传入一个默认的 Watcher

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) 
复制代码

参数中的这个 Watcher 将会被保存在 ZKWatchManager 中,作为整个会话期间的默认的 Watcher

watchManager.defaultWatcher = watcher;
复制代码

除此之外、ZK 客户端也可以通过 getData , getChildren , exist 三个接口向ZK服务端注册 Watcher

我们以 getData 接口来分析

public byte[] getData(final String path, Watcher watcher, Stat stat){
  .....
}
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
        return getData(path, getDefaultWatcher(watch), stat);
}
复制代码

如果我们的参数 watch 为 true , 那么 getDefaultWatcher 就是去拿我们创建Zookeeper 时传入的默认的 Watcher

private Watcher getDefaultWatcher(boolean required) {
        if (required) {
            if (watchManager.defaultWatcher != null) {
                return watchManager.defaultWatcher;
            } else {
                throw new IllegalStateException("Default watcher is required, but it is null.");
            }
        }
        return null;
    }
复制代码

下面是 完整的 getData 代码

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        // 创建 数据类型  的 watch registration
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
        }

        // 将客户端change root directory 的路径加上、变回服务端那边正常的路径
        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        // 标记是否有 watcher
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }
复制代码
  1. 创建一个 DataWatchRegistration

  2. 转换 path (客户端这边可能 change root directory,发送请求前要将其转为为服务端那边的路径)

  3. 使用 ClientCnxn 提交这个请求

public ReplyHeader submitRequest(
        RequestHeader h,
        Record request,
        Record response,
        WatchRegistration watchRegistration,
        WatchDeregistration watchDeregistration) throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(
            h,
            r,
            request,
            response,
            null,
            null,
            null,
            null,
            watchRegistration,
            watchDeregistration);
        ....
        ....
        return r;
    }


复制代码

最终这个 Request 被加入到 outgoingQueue中

public Packet queuePacket(
        RequestHeader h,
        ReplyHeader r,
        Record request,
        Record response,
        AsyncCallback cb,
        String clientPath,
        String serverPath,
        Object ctx,
        WatchRegistration watchRegistration,
        WatchDeregistration watchDeregistration) {
        Packet packet = null;

        packet = new Packet(h, r, request, response, watchRegistration);
         
        synchronized (state) {
        ...
              ....
                outgoingQueue.add(packet);
            }
        }
复制代码

最终发送请求到服务端,在 SendThread#readResponse 中处理返回结果

void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            replyHdr.deserialize(bbia, "header");
            switch (replyHdr.getXid()) {
            case PING_XID:
               ....
               ....
                return;
              case AUTHPACKET_XID:
                ...
                ...
              return;
                // 处理服务端到通知
            case NOTIFICATION_XID:
                LOG.debug("Got notification session id: 0x{}",
                    Long.toHexString(sessionId));
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(chrootPath) == 0) {
                        event.setPath("/");
                    } else if (serverPath.length() > chrootPath.length()) {
                        event.setPath(serverPath.substring(chrootPath.length()));
                     } else {
                         LOG.warn("Got server path {} which is too short for chroot path {}.",
                             event.getPath(), chrootPath);
                     }
                }

                WatchedEvent we = new WatchedEvent(event);
                LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
                // 加入到事件队列中、由EventThread处理
                eventThread.queueEvent(we);
                return;
            default:
                break;
            }

           // 移除这个Pacjet
            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
                }
                packet = pendingQueue.remove();
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {
                ....
                .....
            } finally {
               // 将Watcher 保存在 ClientWatchManager
                finishPacket(packet);
            }
        }
复制代码

主要做了啥事情

  1. 反序列化,获取请求头中的 XID 判断是否是服务端到通知、如果是的话、加入到事件队列中、由EventThread去处理
  2. 从 outgoingQueue中移除 Packet。

  3. 调用 finishPacket 函数、进行一些后续处理

protected void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            p.watchRegistration.register(err);
        }
        ...
        ...   
 }
复制代码

最后回到 WatchRegistration 将对应的 Watcher 注册到对应的 Map<String, Set<Watcher>> 中。

服务端处理 Watcher

先来认识几个主要的组件类

WatchManager 是 ZK 服务端 Watcher 的管理者,其内部管理的 watchTablewatch2Paths 两个存储结构,分别用两个维度对 Watcher 进行存储。

  • watchTable 从数据节点路径的粒度来托管 Watcher。

  • watch2Paths 从 Watcher 的粒度来控制事件触发需要触发的数据节点。

ServerCnxn 是一个 Zookeeper 客户端和负担之间的连接接口、代表了一个客户端和服务端的连接,其默认实现是 NIOServerCnxn ,从 3.4.0 开始引入了基于Netty 的实现 NettyServerCnxn

ServerCnxn 同时实现了 Watcher 接口,因此我们可以将其看作是一个 Watcher 对象.

数据节点的路径和 ServerCnxn 都会被存储在 WatchManager

服务端收到客户端的请求后会在 FinalRequestProcessor#processRequest 中判断当前请求是否需要注册 Watcher。

case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
                path = getDataRequest.getPath();
         // 调用处理 getData 请求的方法
                rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
                requestPathMetricsCollector.registerRequest(request.type, path);
                break;
            }
复制代码
private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
       ....
        ....
        // 这注意、客户端是否需要注册 Watcher、请求中只是有一个 boolean 字段来表示
        // 从请求中获取是否需要注册 Watcher
        byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
        return new GetDataResponse(b, stat);
    }
复制代码
public byte[] getData(String path, Stat stat, Watcher watcher)  {
        return dataTree.getData(path, stat, watcher);
    }

public byte[] getData(String path, Stat stat, Watcher watcher)  {
         
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
              // 这里的 dataWatches 就是 IWatchManager 接口对应的实例
                dataWatches.addWatch(path, watcher);
            }
            data = n.data;
        }
        updateReadStat(path, data == null ? 0 : data.length);
        return data;
    }
复制代码

最终会被放置到 watchTablewatch2Paths 中存储

@Override
    public boolean addWatch(String path, Watcher watcher) {
        return addWatch(path, watcher, WatcherMode.DEFAULT_WATCHER_MODE);
    }

    @Override
    public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {
        if (isDeadWatcher(watcher)) {
            return false;
        }
    // 从中拿出 Set
        Set<Watcher> list = watchTable.get(path);
        if (list == null) {
            list = new HashSet<>(4);
            watchTable.put(path, list);
        }
        list.add(watcher);
    // 
        Set<String> paths = watch2Paths.get(watcher);
        if (paths == null) {
            paths = new HashSet<>();
            watch2Paths.put(watcher, paths);
        }

        watcherModeManager.setWatcherMode(watcher, path, watcherMode);
        return paths.add(path);
    }
复制代码

Watcher 的触发

NodeDataChange 的触发是我们节点的数据内容或者节点的 dataVersion 发生改变。

那么我们可以来看看 org.apache.zookeeper.server.DataTree#setData 方法

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte[] lastdata = null;
        synchronized (n) {
            lastdata = n.data;
            nodes.preChange(path, n);
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
            nodes.postChange(path, n);
        }
      
    ....
        ....
        updateWriteStat(path, dataBytes);
     // 调用IWatchManager 的方法
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }
复制代码
@Override
    public WatcherOrBitSet triggerWatch(String path, EventType type) {
        return triggerWatch(path, type, null);
    }

    @Override
    public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
      // 封装成 WatchedEvent 
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
        Set<Watcher> watchers = new HashSet<>();
        PathParentIterator pathParentIterator = getPathParentIterator(path);
        synchronized (this) {
            for (String localPath : pathParentIterator.asIterable()) {
                Set<Watcher> thisWatchers = watchTable.get(localPath);
              // 无监听
                if (thisWatchers == null || thisWatchers.isEmpty()) {
                    continue;
                }
                Iterator<Watcher> iterator = thisWatchers.iterator();
                while (iterator.hasNext()) {
                    Watcher watcher = iterator.next();
                    WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
                    if (watcherMode.isRecursive()) {
                         
                    } else if (!pathParentIterator.atParentPath()) {
                        watchers.add(watcher);
                        if (!watcherMode.isPersistent()) {
                          // 移除掉
                            iterator.remove();
                            Set<String> paths = watch2Paths.get(watcher);
                            if (paths != null) {
                              // 从 watch2Paths 中移除掉
                                paths.remove(localPath);
                            }
                        }
                    }
                }
               
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
          // 调用 process 方法
            w.process(e);
        }
    .....
        .....
        return new WatcherOrBitSet(watchers);
    }
复制代码

上面已经提及、 ServerCnxn 实现了 Watcher 接口,我们看看 org.apache.zookeeper.server.NIOServerCnxn#process

@Override
    public void process(WatchedEvent event) {
      // 请求头中的 XID 设置为 -1,上面分析 SendThread.readResponse 的时候提及过
        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
     
        // WatchedEvent 变为 WatcherEvent
        WatcherEvent e = event.getWrapper();
    // 给客户端发送通知
        sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
    }
复制代码

基本流程

  • 封装 WatchedEvent

  • watchTable 中找到对应的 Watcher,并将 watchTablewatch2Paths 中相关的 Watcher 和路径清除掉( 只能触发一次喔 )
  • 调用 process 方法。

客户端回调 Watcher

我们先来认识下 EventThread 这个类

继承自 Thread ,使用 LinkedBlockingQueue<Object> waitingEvents 保存将要处理的事件,然后 ```run`` 方法不断的从队列中获取进行处理。

我们已经知道客户端中由 SendThread#readResponse 处理(这段代码也出现在上面的客户端注册 Watcher 的时候)

case NOTIFICATION_XID:
                LOG.debug("Got notification session id: 0x{}",
                    Long.toHexString(sessionId));
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(chrootPath) == 0) {
                        event.setPath("/");
                    } else if (serverPath.length() > chrootPath.length()) {
                        event.setPath(serverPath.substring(chrootPath.length()));
                     } else {
                         LOG.warn("Got server path {} which is too short for chroot path {}.",
                             event.getPath(), chrootPath);
                     }
                }

                WatchedEvent we = new WatchedEvent(event);
                LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
                // 加入到事件队列中、由EventThread处理
                eventThread.queueEvent(we);
                return;
复制代码

加入到 ```waitingEvents`` 队列中

public void queueEvent(WatchedEvent event) {
            queueEvent(event, null);
        }

        private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
            if (event.getType() == EventType.None && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();
            final Set<Watcher> watchers;
            if (materializedWatchers == null) {
                // 从 clientWatchManager 中获取对应的 Watcher,也会从对应的 Map中移除 Watcher
              // 一样是一次性的
                watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
            } else {
                watchers = new HashSet<Watcher>();
                watchers.addAll(materializedWatchers);
            }
            WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
            // 加入到 waitingEvents 中、等待 run 方法 拿出来处理
            waitingEvents.add(pair);
        }
复制代码

run 方法

public void run() {
            try {
                isRunning = true;
                while (true) {
                    Object event = waitingEvents.take();
                    if (event == eventOfDeath) {
                        wasKilled = true;
                    } else {
                        processEvent(event);
                    }
                  ......
                  ......
                }}
        }

        private void processEvent(Object event) {
            try {
                if (event instanceof WatcherSetEventPair) {
                    // each watcher will process the event
                    WatcherSetEventPair pair = (WatcherSetEventPair) event;
                    for (Watcher watcher : pair.watchers) {
                        try {
                          // 调用 process 方法,串行同步处理
                            watcher.process(pair.event);
                        } catch (Throwable t) {
                            LOG.error("Error while calling watcher.", t);
                        }
                    }
                } }
          .......
          .......

    }
复制代码

总结

Watcher 的特性

  • 一次性:无论是客户端还是服务端、一旦 Watcher 触发、都会将其从存储中移除。

  • 客户端串行执行: 串行同步执行的过程、千万不要因为一个 Watcher 而影响整个客户端回调 Watcher

  • 轻量: WatchedEvent 是通知机制中最小的通知单元,只包含了三部分的内容: 通知状态、事件类型、节点路径。而不会将节点的内容以通知的方式告知客户端、而是需要客户端收到通知之后、主动去服务端获取数据。

相关文章

ZooKeeper 数据模型

编译运行Zookeeper源码

本文使用 mdnice 排版

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章