ZooKeeper常用客户端有三种:原生客户端、zkClient、curator

项目中使用前,需要导入相关依赖

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.12</version>
    </dependency>
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.10</version>
    </dependency>

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>
</dependencies>

原生客户端

创建会话

不使用监听

public class TestCreateSession {
    /*服务地址*/
    private static final String ZK_SERVER = "127.0.0.1:2181";
    @Test
    public void createSession2() throws IOException {
        ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, null);
        System.out.println("zk.getState() = " + zk.getState());
    }
}

zk.getState() = CONNECTING 

通过之前的学习可以知道,CONNECTING标志客户端正在连接,并不能确保已经连接上zk服务。可能发生还没有连接到zk服务就进行对zk访问的情况

使用监听

public class TestCreateSession {
    /*服务地址*/
    private static final String ZK_SERVER = "127.0.0.1:2181";
    /*倒计时器*/
    private CountDownLatch latch = new CountDownLatch(1);
    @Test
    public void createSession() throws IOException, InterruptedException {
        ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected){/*确保zk已连接*/
                    latch.countDown();
                }
            }
        });
        latch.await();
        System.out.println("zk.getState() = " + zk.getState());
    }
}

zk.getState() = CONNECTED

使用监听机制可以确保在ZooKeeper初始化完成前进行等待,初始化完成再进行后续操作

客户端基本操作

  1 public class TestJavaApi implements Watcher {
  2     /*zk服务地址*/
  3     private static final String ZK_SERVER = "127.0.0.1:2181";
  4     /*会话连接超时时间*/
  5     private static final int SESSION_TIMEOUT = 50000;
  6     /*指定目录【节点】*/
  7     private static final String ZK_PATH = "/zkDir";
  8     /*客户端连接会话*/
  9     private ZooKeeper zk = null;
 10 
 11     /*倒计时器*/
 12     private CountDownLatch latch = new CountDownLatch(1);
 13     /**
 14      * 事件被触发时的动作
 15      * @param event 事件
 16      */
 17     @Override
 18     public void process(WatchedEvent event) {
 19         System.out.println("收到事件通知:" + zk.getState() +"\n");
 20         if (event.getState() == Event.KeeperState.SyncConnected){
 21             latch.countDown();
 22         }
 23     }
 24 
 25     /**
 26      * 创建zk会话连接
 27      * @param connectString     zk服务器地址列表,可以是"地址1,地址2,...."
 28      * @param sessionTimeout    Session超时时间
 29      */
 30     public void createZkSession(String connectString, int sessionTimeout){
 31         try {
 32             zk = new ZooKeeper(connectString,sessionTimeout,this);
 33             latch.await();
 34             System.out.println("zk.getState() = " + zk.getState());
 35         } catch (IOException|InterruptedException e) {
 36             System.out.println("连接创建失败");
 37             e.printStackTrace();
 38         }
 39     }
 40 
 41     /**
 42      * 关闭zk会话
 43      */
 44     public void releaseSession(){
 45         try {
 46             zk.close();
 47         } catch (InterruptedException e) {
 48             e.printStackTrace();
 49         }
 50     }
 51 
 52     /**
 53      * 创建节点【目录、文件】
 54      * @param path  节点
 55      * @param data  节点数据
 56      * @return
 57      */
 58     public boolean createNode(String path,String data){
 59         try {
 60             String node = zk.create(path/*节点path*/,
 61                     data.getBytes()/*节点数据*/,
 62                     ZooDefs.Ids.OPEN_ACL_UNSAFE/*权限控制  OPEN_ACL_UNSAFE相当于world:anyone*/,
 63                     CreateMode.EPHEMERAL)/*临时节点*/;
 64             System.out.println("节点创建成功,node = " + node);
 65             return true;
 66         } catch (KeeperException|InterruptedException e) {
 67             System.out.println("节点创建失败");
 68             e.printStackTrace();
 69         }
 70         return false;
 71     }
 72 
 73     /**
 74      * 获取节点数据
 75      * @param path  节点路径
 76      * @return
 77      */
 78     public String readNode(String path){
 79         try {
 80             byte[] data = zk.getData(path, true, null);
 81             String nodeData = new String(data,"utf-8");
 82             //System.out.println("获取"+path+"节点数据:"+nodeData);
 83             return nodeData;
 84         } catch (KeeperException | InterruptedException | UnsupportedEncodingException e) {
 85             e.printStackTrace();
 86             return null;
 87         }
 88     }
 89 
 90     /**
 91      * 修改节点数据
 92      * @param path      节点path
 93      * @param newData   节点新数据
 94      * @return
 95      */
 96     public boolean writeNode(String path,String newData){
 97         try {
 98             Stat stat = zk.setData(path, newData.getBytes(), -1);
 99             System.out.println("节点["+path+"]修改成功");
100             return true;
101         } catch (KeeperException|InterruptedException e) {
102             e.printStackTrace();
103         }
104         return false;
105     }
106 
107     /**
108      * 删除指定节点
109      * @param path  节点path
110      */
111     public void deleteNode(String path){
112         try {
113             zk.delete(path,-1);
114             System.out.println("节点["+path+"]删除成功");
115         } catch (InterruptedException|KeeperException e) {
116             System.out.println("节点["+path+"]删除失败");
117             e.printStackTrace();
118         }
119     }
120 
121     public static void main(String[] args) {
122         TestJavaApi api = new TestJavaApi();
123         api.createZkSession(ZK_SERVER,SESSION_TIMEOUT);
124         if(api.createNode(ZK_PATH,"初始节点内容")){
125             System.out.println("第一次读"+ZK_PATH+"节点数据:"+api.readNode(ZK_PATH));
126             api.writeNode(ZK_PATH,"修改ZK_PATH节点数据");
127             System.out.println("第二次读"+ZK_PATH+"节点数据:"+api.readNode(ZK_PATH));
128             api.deleteNode(ZK_PATH);
129         }
130         api.releaseSession();
131     }
132 }
133 /**
134 ************输出结果***********
135     收到事件通知:CONNECTED
136 
137     zk.getState() = CONNECTED
138     节点创建成功,node = /zkDir
139     第一次读/zkDir节点数据:初始节点内容
140     收到事件通知:CONNECTED
141 
142     节点[/zkDir]修改成功
143     第二次读/zkDir节点数据:修改ZK_PATH节点数据
144     收到事件通知:CONNECTED
145 
146     节点[/zkDir]删除成功
147 */
View Code

相关文章: