-----------------------------目录-----------------------------------

第一部分:zookeeper简介

第二部分:zookeeper环境搭建

  1、单机环境

  2、集群环境

第三部分:zookeeper基本使用

  1、java原生zk客户端api操作

  2、zkClient客户端操作(推荐)

  3、curator客户端操作(推荐)

第四部分:zookeeper应用场景

第五部分:zookeeper深入进阶

第六部分:zookeeper源码分析

-----------------------------目录-----------------------------------

 

1、 zookeeper基本概念

zookeeper是一个开源的分布式协调服务,其设计目标是将那些复杂并且容易出差错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并提供给用户一些简单的接口,zookeeper是一个典型的分布式一致性的解决方案(CP模式),分布式应用程序可以基于它实现数据订阅/发布、负载均衡,命名服务、集群管理、分布式锁和分布式队列等功能。

2、基本概念

@1、集群角色

通常在分布式系统中,构成一个集群中的每一台机器都有自己的角色,典型的是master/slave模式(主备模式),这种情况下能够处理写操作的机器成为master机器,把所有通过一步复制方式获取最新数据并且提供服务的机器为slave机器。
在zookeeper中没有是用主备模式,引入Leader、Follower、Observer三种角色,在zk集群中所有的机器通过Leader选举来选Leader,Leader服务器为客户端提供读写服务,Follower和Observer都能提供读服务,唯一的区别是Observer不参与Leader选举,不参与写操作的过半写成功。

@2、会话session

Session指的是客户端的会话,一个客户端链接是指客户端和服务端之间的一个Tcp长连接,ZK对外服务端口默认为2181,客户端启动的时候会与服务器建立一个TCP连接,从第一次链接建立开始客户端会话的声明周期也开始了,通过链接,客户端能够检测心跳和服务端保持有效会话,也能够想zk服务器发送请求并接受响应,同时还能够通过该链接接受来自服务器的Watch事件通知。

@3、数据节点znode

在zk中节点分两类:第一类是指构成集群的机器,我们称为机器节点。第二种是指数据模型中的数据单元,称为数据节点znode。zk的数据存储在内存中,数据模型是树型,有斜杠(/)进行分割路径,就是一个znode,每个znode上都会保存自己的数据内容,同时还会保存一系列属性信息。

@4、版本

zk在每个数据节点都会存储数据,每个节点都会维护一个价stat的数据结构,记录了三个数据版本:version(当前版本)、cversion(当前节点子节点的版本),aversion(当前节点的ACL版本) 

@5、ACL

Zookeeper采用ACL(Access Control Lists)策略来进行权限控制,定义一下五种权限:

  CREATE:创建子节点权限

  READ:获取节点数据和子节点列表的权限

  WRITE:更新节点数据的权限

  DELETE:删除子节点的权限

  ADMIN:设置节点ACL的权限

注意的是create和delete两盒权限是针对子节点的权限控制

第二部分:zookeeper环境搭建

 zookeeper安装有一下三种方式

1、单机模式:zk只运行在一台服务器上,适用测试环境

2、集群模式:zk运行在一个集群环境中,适用于线上生产环境

3、伪集群模式:一台服务器上运行多个zk实例监听不同的端口

这里的环境搭建适用了两个模式,单机模式和伪集群模式

一、单机模式(Linux)

1、下载稳定版本https://zookeeper.apache.org/releases.html

下面是下载命令(注意版本号),也可以登录上面地址进行本地下载,在上传服务器

wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz

2、解压

tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz

3、进入apache-zookeeper-3.6.1目录创建data文件,用于持久话数据的文件夹

cd apache-zookeeper-3.6.1

mkdir data

4、修改配置文件名称

zk默认读取zoo.cfg文件,提供参考文件zoo_sample.cfg文件。

cd conf
cp zoo_sample.cfg zoo.cfg

5、修改zoo.cfg文件中的dataDir属性,定位到刚创建的data文件

dataDir=/user/apache-zookeeper-3.6.1/data

6、启动服务

cd ../bin

./zkServer.sh start

上面start 命令为启动,stop为停止,status为查看zk启动状态以及身份

看到下图提示启动成功:

zookeeper、ZK安装、ZK配置、ZK使用

 二、伪集群模式

    点击--->>> ZK安装、ZK配置、ZK集群部署

第三部分:zookeeper基本使用

java代码调用zookeeper原生Api进行增删改查节点以及数据的代码

一、java调用原生zk客户端

引入pom文件

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.9</version>
        </dependency>

 1、创建zk的客户端连接

 主要是获取zk对象,并进行操作监听。等同于zk开启客户端命令./zkCli.sh。

 需要注意的是监听类实现Watcher接口,重写process 方法

package city.albert.api;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/7/29  6:39 PM
 */
public class OpenSession implements Watcher {

    /**
     * 应为zk的监听为独立线程,需要保持主线程main方法不会直接结束使用CountDownLatch,设置线程数为1
     */
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * 建立会话
     * 客户端创建一个连接链接zk
     *
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException {

        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new OpenSession());
        System.out.println(zooKeeper.getState());
        countDownLatch.await();
    }

    /**
     * 处理来自服务器端的watcher
     *
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {

        System.out.println(watchedEvent.toString());
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("process 执行了。。。");
            countDownLatch.countDown();
        }
    }
}

2、创建zk的数据节点

由于代码篇幅问题,下面先介绍重点代码然后附带测试的源代码~~注意哦!

类似在客户端中执行: create  /my_persist2  123

重点代码:

 private static void createZNode(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
        /**
         * path 创建节点路径
         * data[]  节点值的字节数组
         *  acl  节点权限4中类型
         *       ANYONE_ID_UNSAFE  标识任何人
         *       AUTH_IDS    仅仅这个id才可以设置ACL,他将被客户端验证ID替换
         *       OPEN_ACL_UNSAFE  开放的ACL(常用)
         *       CREATOR_ALL_ACL  此ACL授予创建者身份验证ID的所有权限
         *  createMode  创建节点的4中类型
         *        PERSISTENT            创建持久节点
         *        PERSISTENT_SEQUENTIAL 创建持久顺序节点
         *        EPHEMERAL             创建临时几点
         *        EPHEMERAL_SEQUENTIAL  创建临时顺序节点
         *
         */
        String persistent = zooKeeper.create("/my_persistent", "创建持久节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        String persistentSequential = zooKeeper.create("/my_persistent_sequential", "创建持久顺序节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        String ephemeral = zooKeeper.create("/my_ephemeral", "创建临时节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        String ephemeralSequential= zooKeeper.create("/my_ephemeral_sequential", "创建临时顺序节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println("持久节点 "+persistent);
        System.out.println("持久顺序节点 "+persistentSequential);
        System.out.println("临时节点 "+ephemeral);
        System.out.println("临时顺序节点 "+ephemeralSequential);
    }

完整的test代码

package city.albert.api;

import org.apache.zookeeper.*;

import java.io.IOException;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/7/30  3:30 PM
 */
public class CreateZNodeBySession implements Watcher {

   private static   ZooKeeper zooKeeper;

    /**
     * 建立会话
     * 客户端创建一个连接链接zk
     *
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        //获取zk链接会话对象    ip+端口,超时时间   watcher回调函数
         zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new CreateZNodeBySession());

        System.out.println(zooKeeper.getState());

        Thread.sleep(Integer.MAX_VALUE);
    }

    /**
     * 创建节点
     *
     * @param zooKeeper
     */
    private static void createZNode(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
        /**
         * path 创建节点路径
         * data[]  节点值的字节数组
         *  acl  节点权限4中类型
         *       ANYONE_ID_UNSAFE  标识任何人
         *       AUTH_IDS    仅仅这个id才可以设置ACL,他将被客户端验证ID替换
         *       OPEN_ACL_UNSAFE  开放的ACL(常用)
         *       CREATOR_ALL_ACL  此ACL授予创建者身份验证ID的所有权限
         *  createMode  创建节点的4中类型
         *        PERSISTENT            创建持久节点
         *        PERSISTENT_SEQUENTIAL 创建持久顺序节点
         *        EPHEMERAL             创建临时几点
         *        EPHEMERAL_SEQUENTIAL  创建临时顺序节点
         *
         */
        String persistent = zooKeeper.create("/my_persistent", "创建持久节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        String persistentSequential = zooKeeper.create("/my_persistent_sequential", "创建持久顺序节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        String ephemeral = zooKeeper.create("/my_ephemeral", "创建临时节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        String ephemeralSequential= zooKeeper.create("/my_ephemeral_sequential", "创建临时顺序节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println("持久节点 "+persistent);
        System.out.println("持久顺序节点 "+persistentSequential);
        System.out.println("临时节点 "+ephemeral);
        System.out.println("临时顺序节点 "+ephemeralSequential);
    }


    /**
     * 处理来自服务器端的watcher
     *
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {

        System.out.println(watchedEvent.toString());
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("process 执行了。。。");

            //创建节点
            try {
                createZNode(zooKeeper);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


}
View Code

相关文章: