Zookeeper入门实战(2)-原生API(Java)操作Zookeeper

本文主要介绍使用Zookeeper提供的原生API来操作Zookeeper,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。

1、引入依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
</dependency>

2、基本操作

package com.inspur.demo.general.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;

/**
 * Zookeeper基本操作列子
 */
public class ZookeeperCase {
    //Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
    private static String connectString = "10.49.196.10:2181";
    private static int sessionTimeout = 2 * 1000;

    private ZooKeeper zooKeeper;

    @Before
    public void before() {
        try {
            zooKeeper =  new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {

                }
            });
            System.out.println(zooKeeper.getState());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @After
    public void after() throws Exception {
        zooKeeper.close();
    }

    /**
     * 创建节点
     */
    @Test
    public void create() throws Exception {
        /*
         * 同步创建持久节点,ACL为world:anyone:cdrwa
         * 等同于该命令:create /javatest/node1 test world:anyone:cdrwa
         */
        zooKeeper.create("/javatest/node1", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        /*
         * 同步创建持久节点,ACL为world:anyone:cr
         * 等同于该命令:create /javatest/node2 test world:anyone:cr
         */
        zooKeeper.create("/javatest/node2", "test".getBytes(), Collections.singletonList(new ACL((ZooDefs.Perms.CREATE + ZooDefs.Perms.READ), ZooDefs.Ids.ANYONE_ID_UNSAFE)), CreateMode.PERSISTENT);

        /*
         * 异步创建临时顺序节点,ACL为ip:127.0.0.1:c
         * 等同于该命令:create -s -e /javatest/node3 test ip:127.0.0.1:c
         */
        CountDownLatch counter = new CountDownLatch(1);
        zooKeeper.create("/javatest/node3", "test".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.CREATE, new Id("ip", "127.0.0.1"))), CreateMode.EPHEMERAL_SEQUENTIAL
            ,new AsyncCallback.StringCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx, String name) {
                    System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ", name=" + name);
                    counter.countDown();
                }
            }, "上下文对象,异步回调时会传递给callback");
        counter.await();

        /*
         * 同步创建持久节点,ACL为digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
         * 等同于该命令:create /javatest/node4 test digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
         * 添加认证用户(addauth digest jack:123456)后才能访问/javatest/node4
         */
        zooKeeper.create("/javatest/node4", "test".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg="))) , CreateMode.PERSISTENT);

        /*
         * 同步创建顺序持久节点,ACL为world:anyone:cdrwa,存活时间为5秒
         * 等同于该命令:create -s -t 5000 /javatest/node5 test
         */
        Stat stat = new Stat();
        zooKeeper.create("/javatest/node5", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, stat, 5000);
        System.out.println(stat);
    }

    /**
     * 获取节点数据
     * @throws Exception
     */
    @Test
    public void getData() throws Exception {
        //同步读取数据
        Stat stat = new Stat();
        byte[] data = zooKeeper.getData("/javatest/node1", false, stat);
        System.out.println(new String(data));
        System.out.println(stat);

        //异步读取数据
        zooKeeper.addAuthInfo("digest", "jack:123456".getBytes());
        CountDownLatch counter = new CountDownLatch(1);
        zooKeeper.getData("/javatest/node4", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                String s = "";
                if (data != null) {
                    s = new String(data);
                }
                System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",data=" + s + ",stat=" + stat);
                counter.countDown();
            }
        }, "上下文对象,异步回调时会传递给callback");
        counter.await();
    }

    @Test
    public void setData() throws Exception {
        //同步设置数据,version为-1表示匹配任何版本
        Stat stat = zooKeeper.setData("/javatest/node1", "test2".getBytes(), -1);
        System.out.println(stat);

        //异步设置数据
        zooKeeper.addAuthInfo("digest", "jack:123456".getBytes());
        CountDownLatch counter = new CountDownLatch(1);
        zooKeeper.setData("/javatest/node4", "test2".getBytes(), -1, new AsyncCallback.StatCallback(){
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                System.out.println("rc=" + rc + ",path=" + path + ",stat=" + stat);
                counter.countDown();
            }
        }, "上下文对象,异步回调时会传递给callback");
        counter.await();
    }

    @Test
    public void delete() throws Exception {
        //同步删除数据
        zooKeeper.delete("/javatest/node1", -1);

        //异步删除数据
        CountDownLatch counter = new CountDownLatch(1);
        zooKeeper.delete("/javatest/node2", -1,  new AsyncCallback.VoidCallback(){
            @Override
            public void processResult(int rc, String path, Object ctx) {
                System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx);
                counter.countDown();
            }
        }, "上下文对象,异步回调时会传递给callback");
        counter.await();
    }
}

3、监控节点

DataMonitor类实现对节点的监控,节点有变化时会回调DataMonitorListener.process方法,该方法由调用方根据业务来实现;WatcherCase类传入需要的参数来启动DataMonitor。

该例子是根据 官网例子 改造而来,相较官网更简单了些。

package com.inspur.demo.general.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

public class DataMonitor implements Runnable {
    private ZooKeeper zk;
    private DataMonitorListener listener;

    /**
     * 节点变化时会回调该方法,把监控变化类型及新数据带过来
     */
    public interface DataMonitorListener {
        void process(WatchedEvent event, byte[] data);
    }

    public DataMonitor(String hostPort, String znode, DataMonitorListener listener) throws Exception {
        this.listener = listener;

        AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",stat=" + stat);
                switch (rc) {
                    case KeeperException.Code.Ok:
                    case KeeperException.Code.NoNode:
                        return;
                    case KeeperException.Code.SessionExpired:
                    case KeeperException.Code.NoAuth:
                        close();
                        return;
                    default:
                        zk.exists(znode, true, this, null);
                        return;
                }
            }
        };
        //监视器
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event);
                if (event.getType() == Event.EventType.None) {
                    switch (event.getState()) {
                        case SyncConnected:
                            break;
                        case Expired:
                            close();
                            break;
                    }
                } else {
                    try {
                        byte[] bytes = zk.getData(event.getPath(), false, null);
                        listener.process(event, bytes);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                    if (event.getPath() != null && event.getPath().equals(znode)) {
                        //再次监控
                        zk.exists(znode, true, callback, null);
                    }
                }
            }
        };

        zk = new ZooKeeper("10.49.196.10:2181", 20000, watcher);
        zk.exists(znode, true, callback, null);
    }

    @Override
    public void run() {
        try {
            synchronized (this) {
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void close() {
        synchronized (this) {
            notifyAll();
        }
    }
}
package com.inspur.demo.general.zookeeper;

import org.apache.zookeeper.*;

/**
 * 监视节点样例
 */
public class WatcherCase {
    public static void main(String[] args) throws Exception {
        DataMonitor.DataMonitorListener listener = new DataMonitor.DataMonitorListener() {
            @Override
            public void process(WatchedEvent event, byte[] data) {
                //todo:根据实际情况处理
                if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
                    System.out.println(new String(data));
                }
            }
        };
        new DataMonitor("10.49.196.10:2181", "/watchtest", listener).run();
    }
}
我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章