ZooKeeper常用客户端有三种:原生客户端、zkClient、curator
项目中使用前,需要导入相关依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
原生客户端
创建会话
不使用监听
public class TestCreateSession {
/*服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
@Test
public void createSession2() throws IOException {
ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, null);
System.out.println("zk.getState() = " + zk.getState());
}
}
zk.getState() = CONNECTING
通过之前的学习可以知道,CONNECTING标志客户端正在连接,并不能确保已经连接上zk服务。可能发生还没有连接到zk服务就进行对zk访问的情况
使用监听
public class TestCreateSession {
/*服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
/*倒计时器*/
private CountDownLatch latch = new CountDownLatch(1);
@Test
public void createSession() throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected){/*确保zk已连接*/
latch.countDown();
}
}
});
latch.await();
System.out.println("zk.getState() = " + zk.getState());
}
}
zk.getState() = CONNECTED
使用监听机制可以确保在ZooKeeper初始化完成前进行等待,初始化完成再进行后续操作
客户端基本操作
1 public class TestJavaApi implements Watcher { 2 /*zk服务地址*/ 3 private static final String ZK_SERVER = "127.0.0.1:2181"; 4 /*会话连接超时时间*/ 5 private static final int SESSION_TIMEOUT = 50000; 6 /*指定目录【节点】*/ 7 private static final String ZK_PATH = "/zkDir"; 8 /*客户端连接会话*/ 9 private ZooKeeper zk = null; 10 11 /*倒计时器*/ 12 private CountDownLatch latch = new CountDownLatch(1); 13 /** 14 * 事件被触发时的动作 15 * @param event 事件 16 */ 17 @Override 18 public void process(WatchedEvent event) { 19 System.out.println("收到事件通知:" + zk.getState() +"\n"); 20 if (event.getState() == Event.KeeperState.SyncConnected){ 21 latch.countDown(); 22 } 23 } 24 25 /** 26 * 创建zk会话连接 27 * @param connectString zk服务器地址列表,可以是"地址1,地址2,...." 28 * @param sessionTimeout Session超时时间 29 */ 30 public void createZkSession(String connectString, int sessionTimeout){ 31 try { 32 zk = new ZooKeeper(connectString,sessionTimeout,this); 33 latch.await(); 34 System.out.println("zk.getState() = " + zk.getState()); 35 } catch (IOException|InterruptedException e) { 36 System.out.println("连接创建失败"); 37 e.printStackTrace(); 38 } 39 } 40 41 /** 42 * 关闭zk会话 43 */ 44 public void releaseSession(){ 45 try { 46 zk.close(); 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 } 51 52 /** 53 * 创建节点【目录、文件】 54 * @param path 节点 55 * @param data 节点数据 56 * @return 57 */ 58 public boolean createNode(String path,String data){ 59 try { 60 String node = zk.create(path/*节点path*/, 61 data.getBytes()/*节点数据*/, 62 ZooDefs.Ids.OPEN_ACL_UNSAFE/*权限控制 OPEN_ACL_UNSAFE相当于world:anyone*/, 63 CreateMode.EPHEMERAL)/*临时节点*/; 64 System.out.println("节点创建成功,node = " + node); 65 return true; 66 } catch (KeeperException|InterruptedException e) { 67 System.out.println("节点创建失败"); 68 e.printStackTrace(); 69 } 70 return false; 71 } 72 73 /** 74 * 获取节点数据 75 * @param path 节点路径 76 * @return 77 */ 78 public String readNode(String path){ 79 try { 80 byte[] data = zk.getData(path, true, null); 81 String nodeData = new String(data,"utf-8"); 82 //System.out.println("获取"+path+"节点数据:"+nodeData); 83 return nodeData; 84 } catch (KeeperException | InterruptedException | UnsupportedEncodingException e) { 85 e.printStackTrace(); 86 return null; 87 } 88 } 89 90 /** 91 * 修改节点数据 92 * @param path 节点path 93 * @param newData 节点新数据 94 * @return 95 */ 96 public boolean writeNode(String path,String newData){ 97 try { 98 Stat stat = zk.setData(path, newData.getBytes(), -1); 99 System.out.println("节点["+path+"]修改成功"); 100 return true; 101 } catch (KeeperException|InterruptedException e) { 102 e.printStackTrace(); 103 } 104 return false; 105 } 106 107 /** 108 * 删除指定节点 109 * @param path 节点path 110 */ 111 public void deleteNode(String path){ 112 try { 113 zk.delete(path,-1); 114 System.out.println("节点["+path+"]删除成功"); 115 } catch (InterruptedException|KeeperException e) { 116 System.out.println("节点["+path+"]删除失败"); 117 e.printStackTrace(); 118 } 119 } 120 121 public static void main(String[] args) { 122 TestJavaApi api = new TestJavaApi(); 123 api.createZkSession(ZK_SERVER,SESSION_TIMEOUT); 124 if(api.createNode(ZK_PATH,"初始节点内容")){ 125 System.out.println("第一次读"+ZK_PATH+"节点数据:"+api.readNode(ZK_PATH)); 126 api.writeNode(ZK_PATH,"修改ZK_PATH节点数据"); 127 System.out.println("第二次读"+ZK_PATH+"节点数据:"+api.readNode(ZK_PATH)); 128 api.deleteNode(ZK_PATH); 129 } 130 api.releaseSession(); 131 } 132 } 133 /** 134 ************输出结果*********** 135 收到事件通知:CONNECTED 136 137 zk.getState() = CONNECTED 138 节点创建成功,node = /zkDir 139 第一次读/zkDir节点数据:初始节点内容 140 收到事件通知:CONNECTED 141 142 节点[/zkDir]修改成功 143 第二次读/zkDir节点数据:修改ZK_PATH节点数据 144 收到事件通知:CONNECTED 145 146 节点[/zkDir]删除成功 147 */