-----------------------------目录-----------------------------------
第一部分:zookeeper简介
第二部分:zookeeper环境搭建
1、单机环境
2、集群环境
第三部分:zookeeper基本使用
1、java原生zk客户端api操作
2、zkClient客户端操作(推荐)
3、curator客户端操作(推荐)
第四部分:zookeeper应用场景
第五部分:zookeeper深入进阶
第六部分:zookeeper源码分析
-----------------------------目录-----------------------------------
1、 zookeeper基本概念
zookeeper是一个开源的分布式协调服务,其设计目标是将那些复杂并且容易出差错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并提供给用户一些简单的接口,zookeeper是一个典型的分布式一致性的解决方案(CP模式),分布式应用程序可以基于它实现数据订阅/发布、负载均衡,命名服务、集群管理、分布式锁和分布式队列等功能。
2、基本概念
@1、集群角色
@2、会话session
@3、数据节点znode
@4、版本
@5、ACL
Zookeeper采用ACL(Access Control Lists)策略来进行权限控制,定义一下五种权限:
CREATE:创建子节点权限
READ:获取节点数据和子节点列表的权限
WRITE:更新节点数据的权限
DELETE:删除子节点的权限
ADMIN:设置节点ACL的权限
注意的是create和delete两盒权限是针对子节点的权限控制
第二部分:zookeeper环境搭建
zookeeper安装有一下三种方式
1、单机模式:zk只运行在一台服务器上,适用测试环境
2、集群模式:zk运行在一个集群环境中,适用于线上生产环境
3、伪集群模式:一台服务器上运行多个zk实例监听不同的端口
这里的环境搭建适用了两个模式,单机模式和伪集群模式
一、单机模式(Linux)
1、下载稳定版本https://zookeeper.apache.org/releases.html
下面是下载命令(注意版本号),也可以登录上面地址进行本地下载,在上传服务器
wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
2、解压
tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
3、进入apache-zookeeper-3.6.1目录创建data文件,用于持久话数据的文件夹
cd apache-zookeeper-3.6.1 mkdir data
4、修改配置文件名称
zk默认读取zoo.cfg文件,提供参考文件zoo_sample.cfg文件。
cd conf cp zoo_sample.cfg zoo.cfg
5、修改zoo.cfg文件中的dataDir属性,定位到刚创建的data文件
dataDir=/user/apache-zookeeper-3.6.1/data
6、启动服务
cd ../bin ./zkServer.sh start
上面start 命令为启动,stop为停止,status为查看zk启动状态以及身份
看到下图提示启动成功:
二、伪集群模式
点击--->>> ZK安装、ZK配置、ZK集群部署
第三部分:zookeeper基本使用
java代码调用zookeeper原生Api进行增删改查节点以及数据的代码
一、java调用原生zk客户端
引入pom文件
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.9</version> </dependency>
1、创建zk的客户端连接
主要是获取zk对象,并进行操作监听。等同于zk开启客户端命令./zkCli.sh。
需要注意的是监听类实现Watcher接口,重写process 方法
package city.albert.api; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/7/29 6:39 PM */ public class OpenSession implements Watcher { /** * 应为zk的监听为独立线程,需要保持主线程main方法不会直接结束使用CountDownLatch,设置线程数为1 */ private static CountDownLatch countDownLatch = new CountDownLatch(1); /** * 建立会话 * 客户端创建一个连接链接zk * * @param args */ public static void main(String[] args) throws IOException, InterruptedException { ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new OpenSession()); System.out.println(zooKeeper.getState()); countDownLatch.await(); } /** * 处理来自服务器端的watcher * * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.toString()); if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("process 执行了。。。"); countDownLatch.countDown(); } } }
2、创建zk的数据节点
由于代码篇幅问题,下面先介绍重点代码,然后附带测试的源代码~~注意哦!
类似在客户端中执行: create /my_persist2 123
重点代码:
private static void createZNode(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { /** * path 创建节点路径 * data[] 节点值的字节数组 * acl 节点权限4中类型 * ANYONE_ID_UNSAFE 标识任何人 * AUTH_IDS 仅仅这个id才可以设置ACL,他将被客户端验证ID替换 * OPEN_ACL_UNSAFE 开放的ACL(常用) * CREATOR_ALL_ACL 此ACL授予创建者身份验证ID的所有权限 * createMode 创建节点的4中类型 * PERSISTENT 创建持久节点 * PERSISTENT_SEQUENTIAL 创建持久顺序节点 * EPHEMERAL 创建临时几点 * EPHEMERAL_SEQUENTIAL 创建临时顺序节点 * */ String persistent = zooKeeper.create("/my_persistent", "创建持久节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String persistentSequential = zooKeeper.create("/my_persistent_sequential", "创建持久顺序节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); String ephemeral = zooKeeper.create("/my_ephemeral", "创建临时节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); String ephemeralSequential= zooKeeper.create("/my_ephemeral_sequential", "创建临时顺序节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("持久节点 "+persistent); System.out.println("持久顺序节点 "+persistentSequential); System.out.println("临时节点 "+ephemeral); System.out.println("临时顺序节点 "+ephemeralSequential); }
完整的test代码
package city.albert.api; import org.apache.zookeeper.*; import java.io.IOException; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/7/30 3:30 PM */ public class CreateZNodeBySession implements Watcher { private static ZooKeeper zooKeeper; /** * 建立会话 * 客户端创建一个连接链接zk * * @param args */ public static void main(String[] args) throws IOException, InterruptedException, KeeperException { //获取zk链接会话对象 ip+端口,超时时间 watcher回调函数 zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new CreateZNodeBySession()); System.out.println(zooKeeper.getState()); Thread.sleep(Integer.MAX_VALUE); } /** * 创建节点 * * @param zooKeeper */ private static void createZNode(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { /** * path 创建节点路径 * data[] 节点值的字节数组 * acl 节点权限4中类型 * ANYONE_ID_UNSAFE 标识任何人 * AUTH_IDS 仅仅这个id才可以设置ACL,他将被客户端验证ID替换 * OPEN_ACL_UNSAFE 开放的ACL(常用) * CREATOR_ALL_ACL 此ACL授予创建者身份验证ID的所有权限 * createMode 创建节点的4中类型 * PERSISTENT 创建持久节点 * PERSISTENT_SEQUENTIAL 创建持久顺序节点 * EPHEMERAL 创建临时几点 * EPHEMERAL_SEQUENTIAL 创建临时顺序节点 * */ String persistent = zooKeeper.create("/my_persistent", "创建持久节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String persistentSequential = zooKeeper.create("/my_persistent_sequential", "创建持久顺序节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); String ephemeral = zooKeeper.create("/my_ephemeral", "创建临时节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); String ephemeralSequential= zooKeeper.create("/my_ephemeral_sequential", "创建临时顺序节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("持久节点 "+persistent); System.out.println("持久顺序节点 "+persistentSequential); System.out.println("临时节点 "+ephemeral); System.out.println("临时顺序节点 "+ephemeralSequential); } /** * 处理来自服务器端的watcher * * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.toString()); if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("process 执行了。。。"); //创建节点 try { createZNode(zooKeeper); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }