转载请说明出处:https://blog.csdn.net/LiaoHongHB/article/details/84983529

在传统的单进程编程中,我们使用队列来存储一些数据结构,用来在多线程之间共享或传递数据。

分布式环境下,我们同样需要一个类似单进程队列的组件,用来实现跨进程、跨主机、跨网络的数据共享和数据传递,这就是我们的分布式队列。

zookeeper可以通过顺序节点实现分布式队列。

架构图

基于zookeeper集群下的分布式队列

 

图中左侧代表zookeeper集群,右侧代表消费者和生产者。
生产者通过在queue节点下创建顺序节点来存放数据,消费者通过读取顺序节点来消费数据。

流程图

offer核心算法流程

基于zookeeper集群下的分布式队列

 

poll核心算法流程

基于zookeeper集群下的分布式队列

1、新建工程:zookeeper-queue

基于zookeeper集群下的分布式队列

 

ZookeeperQueue:

 

 

public class ZookeeperQueue {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private ZkClient zkClient = null;
    private int SESSIONTIMEOUT = 25000;
    private int CONNECTIONTIMEOUT = 25000;

    public ZookeeperQueue() {
    }

    public ZookeeperQueue(String ipAddress) {
        this.zkClient = new ZkClient(ipAddress, SESSIONTIMEOUT, CONNECTIONTIMEOUT, new SerializableSerializer());
    }

    public void start() {
        logger.info("ZookeeperQueue 服务启动,准备初始化...");
        init();
    }

    public void init() {
        logger.info("ZookeeperQueue 开始初始化...");
    }

    /**
     * 入队
     */
    public void offer(String nodeData) {
        boolean exists = zkClient.exists("/queue");
        if (!exists) {
            logger.info("创建{}节点", "/queue");
            zkClient.createPersistent("/queue");
        }
        String path = "/queue" + "/queue-";
        String ephemeralSequential = zkClient.createEphemeralSequential(path, nodeData);
        logger.info("{}创建{}队列节点", nodeData, ephemeralSequential);
    }

    public void poll() {
        //得到"/queue"的子节点
        List<String> children = zkClient.getChildren("/queue");
        //升序排序
        Collections.sort(children, new Comparator<String>() {
            @Override
            public int compare(String o1, String o2) {
                o1 = StringUtils.substringAfter(o1, "-");
                o2 = StringUtils.substringAfter(o2, "-");
                int num1 = Integer.parseInt(o1);
                int num2 = Integer.parseInt(o2);
                if (num1 > num2) {
                    return 1;
                } else if (num1 < num2) {
                    return -1;
                } else {
                    return 0;
                }
            }
        });
        logger.info("节点{}的子节点有:{}", "/queue", children.toString());
        for (String string : children) {
            String path = "/queue/".concat(string);
            String data = zkClient.readData(path).toString();
            logger.info("节点:{}出队,信息为:{}", path, data);
            zkClient.delete(path);
        }
        logger.info("队列节点{}完成出队", "/queue");
    }

}

WebListener.class:

@Component
public class WebListener implements ServletContextListener {

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        ZookeeperQueue zookeeperQueue = new ZookeeperQueue("192.168.202.128:2181");
        zookeeperQueue.start();
        String nodaData1 = "nodaData1";
        String nodaData2 = "nodaData2";
        String nodaData3 = "nodaData3";
        zookeeperQueue.start();
        //生产者入队
        zookeeperQueue.offer(nodaData1);
        zookeeperQueue.offer(nodaData2);
        zookeeperQueue.offer(nodaData3);
        try{
            Thread.sleep(10000);
        }catch (Exception e) {
            e.printStackTrace();
        }
        //消费者出队
        zookeeperQueue.poll();
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {

    }
}

2、运行及运行结果:

基于zookeeper集群下的分布式队列

 

 

相关文章: