Zookeeper入门实战(3)-Curator操作Zookeeper

Apache Curator是用于Apache ZooKeeper的一个Java客户端库;它包括一个高级API框架和实用程序,使使用Apache ZooKeeper更加容易和可靠。Curator之于ZooKeeper就像Cuava之于Java。

本文件主要介绍使用Curator操作Zookeeper,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。

1、引入依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
</dependency>

2、基本操作

package com.inspur.demo.general.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

/**
 * 使用Curator操作Zookeeper
 */
public class CuratorCase {
    //Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
    private static String connectString = "10.49.196.10:2181";
    private static int sessionTimeout = 20 * 1000;
    private static int connectionTimeout = 10 * 1000;

    private CuratorFramework cf;
    @Before
    public void before() {
        RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000,3);
        cf = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        cf.start();
    }

    @After
    public void after() throws Exception {
        cf.close();
    }

    /**
     * 创建节点
     */
    @Test
    public void create() throws Exception {
        /*
         * 同步创建节点
         * 1.除非指明创建节点的类型,默认是持久节点
         * 2.临时节点没有子节点;所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
         */
        //创建一个内容为空的节点
        cf.create().forPath("/curator/node1");
        //创建一个内容为aaa的节点
        cf.create().forPath("/curator/node2", "aaa".getBytes());
        //创建一个临时节点
        cf.create().withMode(CreateMode.EPHEMERAL).forPath("/curator/node3");
        //递归创建,最后的节点类型为临时节点
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/curator/node4/a/b");
        //创建一个节点,ACL为digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
        cf.create().withACL(Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg=")))).forPath("/curator/node5");

        /*
         * 异步创建节点
         *  可以指定线程池,不指定则使用Zookeeper的EventThread线程对事件进行串行处理
         */
        CountDownLatch counter = new CountDownLatch(2);
        cf.create().inBackground(new BackgroundCallback(){
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println(event);
                counter.countDown();
            }
        }, Executors.newFixedThreadPool(1)).forPath("/curator/node6");
        cf.create().inBackground(new BackgroundCallback(){
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println(event);
                counter.countDown();
            }
        }).forPath("/curator/node7");
        counter.await();
    }

    /**
     * 获取节点内容
     * @throws Exception
     */
    @Test
    public void getData() throws Exception {
        Stat stat = new Stat();
        byte[] bytes = cf.getData()
                .storingStatIn(stat)//状态,可选
                .forPath("/curator/node2");
        System.out.println("状态信息:" + stat);
        System.out.println("内容:" + new String(bytes));

        //异步获取数据
        CountDownLatch counter = new CountDownLatch(1);
        cf.getData().inBackground(new BackgroundCallback(){
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event:" + event);
                System.out.println("内容:"+ new String(event.getData()));
                counter.countDown();
            }
        }).forPath("/curator/node2");
        counter.await();
    }

    /**
     * 设置节点的值
     * @throws Exception
     */
    @Test
    public void setData() throws Exception {
        cf.setData()
                .withVersion(0) //指定版本,可选
                .forPath("/curator/node2", "测试修改".getBytes());
    }

    /**
     * 删除节点
     * @throws Exception
     */
    @Test
    public void delete() throws Exception {
        cf.delete()
                .guaranteed() //如果删除失败,只要会话有效就会不断的重试,直到删除成功为止
                .deletingChildrenIfNeeded()//删除子节点,可选
                .withVersion(0) //指定版本,可选
                .forPath("/curator/node4");
    }

    /**
     * 获取子节点
     * @throws Exception
     */
    @Test
    public void getChildren() throws Exception {
        List<String> list = cf.getChildren().forPath("/curator");
        System.out.println("子节点:" + list);
    }
}

3、监控数据变化

package com.inspur.demo.general.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.*;

public class CuratorWatchCase {
    //Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
    private static String connectString = "10.49.196.10:2181";
    private static int sessionTimeout = 20 * 1000;
    private static int connectionTimeout = 10 * 1000;

    private CuratorFramework cf;
    @Before
    public void before() {
        RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000,3);
        cf = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        cf.start();
    }

    @After
    public void after() throws Exception {
        cf.close();
    }

    /**
     * 监控节点变化
     * @throws Exception
     */
    @Test
    public void watchNode() throws Exception {
        CountDownLatch counter = new CountDownLatch(1);

        NodeCache cache = new NodeCache(cf, "/curator/node2", false);
        cache.start();
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("路径为:" + cache.getCurrentData().getPath());
                System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
                System.out.println("状态为:" + cache.getCurrentData().getStat());

                //某种情况下退出监控
                //if (...) {
                //    counter.countDown();
                //}
            }
        });

        counter.await();
    }

    /**
     * 监控子节点变化
     * @throws Exception
     */
    @Test
    public void watchChildren() throws Exception {
        //使用自定义的线程池
        ExecutorService threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(32), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        CountDownLatch counter = new CountDownLatch(1);

        PathChildrenCache cache = new PathChildrenCache(cf, "/curator/node2", true);
        cache.start();
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("CHILD_ADDED");
                        break;
                    case CHILD_UPDATED:
                        System.out.println("CHILD_UPDATED");
                        break;
                    case CHILD_REMOVED:
                        System.out.println("CHILD_REMOVED");
                        break;
                    default:
                        System.out.println(event.getType());
                }
                System.out.println("子节点信息:" + event.getData());

                //某种情况下退出监控
                //if (...) {
                //    counter.countDown();
                //}
            }
        }, threadPool);

        counter.await();
        threadPool.shutdownNow();
    }
}

可以看到不管是基本的增删改查还是监控数据变化,Curator都比原生的API好用很多。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章