zkClient 是GitHub上的一个开源Zookeeper客户端项目,是由Datameer的工程师Stefan和Groschupf和PeteVoss一起开发的。zkClient在Zookeeper原生的API接口之进行了包装,是一个简易的Zookeeper客户端。同时,zkClient在内部实现了诸如Session超时重连,Watcher反复注册等功能,使得Zookeeper客户端的繁琐的细节对开发人员透明。

1. ZkClient的使用配置

在Maven中添加依赖

 <!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.11</version>
            <exclusions>
                <exclusion>
                    <artifactId>zookeeper</artifactId>
                    <groupId>org.apache.zookeeper</groupId>
                </exclusion>
            </exclusions>
        </dependency>

2. 创建会话

在zkClient中,有如下7种方法创建一个会话:

public ZkClient(String serverstring)

public ZkClient(String zkServers, int connectionTimeout)

public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)

public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer)

public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)

public ZkClient(IZkConnection connection)

public ZkClient(IZkConnection connection, int connectionTimeout)

public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer)

zkClient构造方法参数说明如下:

Zookeeper分布式一致性原理(六):Zookeeper开源客户端zkClient

之前我们提到Zookeeper会话建立的过程是一个异步的过程。对于Zookeeper 客户端的这个特点,开发人员需要自己来进行等待处理,而zkClient通过内部包装,将这个异步的会话过程同步化了,这对于开发者的使用来说是非常方便的。

我之前提到的,目前Zookeeper的节点内容只支持字节数组(byte[])类型。zkClient中定义了zkSerialzier接口,允许用户传入一个序列化的实现,如Hession或者Kryo,默认情况下,zkClient使用了Java自带的序列化方式进行对象的序列化。

ZkClient和Zookeeper原生构造方法最大区别,那就是在zkClient中的构造方法中,不再提供传入Watcher对象的参数了。zkClient中引入了大多数Java程序中使用的Listener来实现Watcher注册。

// 使用ZkClient来创建一个ZooKeeper客户端
public class CreateSession {
    public static void main(String[] args) throws IOException, InterruptedException {
    	ZkClient zkClient = new ZkClient(Constant.ZK_CONNECT_STRING, Constant.ZK_SESSION_TIMEOUT);
    	System.out.println("ZooKeeper session established.");
    }
}

运行结果:

ZooKeeper session established.

sessionTimeout指的就是会话宽限期时间。

3. 创建节点

zkClient中提供了一系列接口来创建节点

public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode) 

public String create(final String path, Object data, final CreateMode mode)

public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode)

public void createPersistent(String path, Object data)

public void createPersistent(String path, Object data, List<ACL> acl) 

public String createPersistentSequential(String path, Object data)

public String createPersistentSequential(String path, Object data, List<ACL> acl)

public void createEphemeral(final String path, final Object data)

public void createEphemeral(final String path, final Object data, final List<ACL> acl)

public String createEphemeralSequential(final String path, final Object data)

public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl)

该API的方法的参数用法如下:

Zookeeper分布式一致性原理(六):Zookeeper开源客户端zkClient

运行成功

4. 删除节点

在zkClient中提供了一系列接口来删除节点

public boolean delete(final String path)

public boolean delete(final String path, final int version)

public boolean deleteRecursive(String path) 

zkClient delete API参数说明

Zookeeper分布式一致性原理(六):Zookeeper开源客户端zkClient

通过调用这个接口,就可以对指定节点的数据进行删除操作了。我们主要来看一下这个deleteRecursive接口。通过这个接口,我们就可以通过层次遍历来删除节点了。

//ZkClient删除节点数据
public class DelData {
	public static void main(String[] args) throws Exception {
		String path = "/zk-book";
    	ZkClient zkClient = new ZkClient(Constant.ZK_CONNECT_STRING, Constant.ZK_SESSION_TIMEOUT);
        //zkClient.createPersistent(path, "");
        //zkClient.createPersistent(path+"/c1", "");
        zkClient.deleteRecursive(path);
    }
}

运行结果:

删除成功

5. 读取数据

5.1 getchildren

在ZkClient 我们可以通过以下API来访问指定节点的列表:

protected List<String> getChildren(final String path, final boolean watch)

public List<String> getChildren(String path)

和Zookeeper原生API相比,ZkClient提供的API没有了Watcher注册的功能。zkClient中引入了Listener概念,客户端可以通过注册相关的事件监听来实现对Zookeeper服务器事件的订阅。在获取子节点的列表接口中,可以通过如下API来注册监听:

public List<String> subscribeChildChanges(String path, IZkChildListener listener)

public void unsubscribeChildChanges(String path, IZkChildListener childListener)

通过调用上面的API就可以完成了事件的监听的注册功能。从API方法中,注册时对子节点的列表更改的监听,也就是说,一旦子节点的列表发生变更,Zookeeper就会像客户端发出事件通知,由这个Listener来处理。下面我们给出这个IZkChildListener的定义:

public interface IZkChildListener {

    /**
     * Called when the children of the given path changed.
     * 
     * @param parentPath
     *            The parent path
     * @param currentChilds
     *            The children or null if the root node (parent path) was deleted.
     * @throws Exception
     */
    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
}

IZkChildListener APi参数说明:

Zookeeper分布式一致性原理(六):Zookeeper开源客户端zkClient

// ZkClient获取子节点列表。
public class GetChildren {

    public static void main(String[] args) throws Exception {

    	String path = "/zk-book";
        ZkClient zkClient = new ZkClient(Constant.ZK_CONNECT_STRING, Constant.ZK_SESSION_TIMEOUT);
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);
            }
        });
        
        zkClient.createPersistent(path);
        Thread.sleep( 1000 );
        System.out.println(zkClient.getChildren(path));
        Thread.sleep( 1000 );
        zkClient.createPersistent(path+"/c1");
        Thread.sleep( 1000 );
        zkClient.delete(path+"/c1");
        Thread.sleep( 1000 );
        zkClient.delete(path);
        Thread.sleep( Integer.MAX_VALUE );
    }
}

运行结果:

/zk-book 's child changed, currentChilds:[]
[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null
  • 客户端可以通过对一个不存在的节点进行子节点变更的监听
  • 一旦客户单对一个节点注册了子节点列表变更的通知之后,那么该节点的子节点列表发生变更的时候,服务器都会通知客户端,并将最新的子节点列表发送给客户端。
  • 该节点本身的创建和删除也会通知客户端。

5.2 getData

在ZkClient 我们可以通过以下API来获取指定节点的数据:

public <T extends Object> T readData(String path)

public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists) 

public <T extends Object> T readData(String path, Stat stat)

该API方法的参数说明如下:

Zookeeper分布式一致性原理(六):Zookeeper开源客户端zkClient

在获取节点数据的同时,我们可以通过如下API,完成数据改变事件的监听:

public void subscribeDataChanges(String path, IZkDataListener listener)

通过调用上面的API就可以完成了事件的监听的注册功能。从API方法中,注册时节点的数据变化进行蒋婷,也就是说,一旦节点的数据发生变更,Zookeeper就会像客户端发出事件通知,由这个Listener来处理。下面我们给出这个IZkDataListener的定义:

public interface IZkDataListener {

    public void handleDataChange(String dataPath, Object data) throws Exception;

    public void handleDataDeleted(String dataPath) throws Exception;
}

IZkDataListener的API参数说明:

Zookeeper分布式一致性原理(六):Zookeeper开源客户端zkClient

Zookeeper分布式一致性原理(六):Zookeeper开源客户端zkClient

//ZkClient获取节点数据
public class GetData {
    public static void main(String[] args) throws Exception {
    	
    	String path = "/zk-book";
        ZkClient zkClient = new ZkClient(Constant.ZK_CONNECT_STRING, Constant.ZK_SESSION_TIMEOUT);
        zkClient.createEphemeral(path, "123");
        
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("Node " + dataPath + " deleted.");
            }
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("Node " + dataPath + " changed, new data: " + data);
            }
        });
        
        System.out.println((String) zkClient.readData(path));
        zkClient.writeData(path,"456");
        Thread.sleep(1000);
        zkClient.delete(path);
        Thread.sleep( Integer.MAX_VALUE );
    }
}

运行结果:

123
Node /zk-book changed, new data: 456
Node /zk-book deleted.

5.3 更新数据

在ZkClient 我们可以通过以下API来更新指定节点的数据:

public void writeData(String path, Object object)

public void writeData(final String path, Object datat, final int expectedVersion) 

public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion)

zkClient writeData API参数说明

Zookeeper分布式一致性原理(六):Zookeeper开源客户端zkClient

//ZkClient更新节点数据
public class SetData {

    public static void main(String[] args) throws Exception {
    	String path = "/zk-book";
    	ZkClient zkClient = new ZkClient(Constant.ZK_CONNECT_STRING, Constant.ZK_SESSION_TIMEOUT);
        zkClient.createEphemeral(path, new Integer(1));
        zkClient.writeData(path, new Integer(1));
    }
}

运行结果:

运行成功

5.4 判断节点是否存在

在ZkClient 我们可以通过以下API来判断节点是否存在:

public boolean exists(final String path) 
//ZkClient检测节点是否存在
public class ExistNode {
    public static void main(String[] args) throws Exception {
    	String path = "/zk-book";
    	ZkClient zkClient = new ZkClient("domain1.book.zookeeper:2181", 2000);
        System.out.println("Node " + path + " exists " + zkClient.exists(path));
    }
}

运行结果:

Node /zk-book exists false

相关文章: