转载请说明出处:https://blog.csdn.net/LiaoHongHB/article/details/84983529
在传统的单进程编程中,我们使用队列来存储一些数据结构,用来在多线程之间共享或传递数据。
分布式环境下,我们同样需要一个类似单进程队列的组件,用来实现跨进程、跨主机、跨网络的数据共享和数据传递,这就是我们的分布式队列。
zookeeper可以通过顺序节点实现分布式队列。
架构图
图中左侧代表zookeeper集群,右侧代表消费者和生产者。
生产者通过在queue节点下创建顺序节点来存放数据,消费者通过读取顺序节点来消费数据。
流程图
offer核心算法流程
poll核心算法流程
1、新建工程:zookeeper-queue
ZookeeperQueue:
public class ZookeeperQueue {
private Logger logger = LoggerFactory.getLogger(getClass());
private ZkClient zkClient = null;
private int SESSIONTIMEOUT = 25000;
private int CONNECTIONTIMEOUT = 25000;
public ZookeeperQueue() {
}
public ZookeeperQueue(String ipAddress) {
this.zkClient = new ZkClient(ipAddress, SESSIONTIMEOUT, CONNECTIONTIMEOUT, new SerializableSerializer());
}
public void start() {
logger.info("ZookeeperQueue 服务启动,准备初始化...");
init();
}
public void init() {
logger.info("ZookeeperQueue 开始初始化...");
}
/**
* 入队
*/
public void offer(String nodeData) {
boolean exists = zkClient.exists("/queue");
if (!exists) {
logger.info("创建{}节点", "/queue");
zkClient.createPersistent("/queue");
}
String path = "/queue" + "/queue-";
String ephemeralSequential = zkClient.createEphemeralSequential(path, nodeData);
logger.info("{}创建{}队列节点", nodeData, ephemeralSequential);
}
public void poll() {
//得到"/queue"的子节点
List<String> children = zkClient.getChildren("/queue");
//升序排序
Collections.sort(children, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
o1 = StringUtils.substringAfter(o1, "-");
o2 = StringUtils.substringAfter(o2, "-");
int num1 = Integer.parseInt(o1);
int num2 = Integer.parseInt(o2);
if (num1 > num2) {
return 1;
} else if (num1 < num2) {
return -1;
} else {
return 0;
}
}
});
logger.info("节点{}的子节点有:{}", "/queue", children.toString());
for (String string : children) {
String path = "/queue/".concat(string);
String data = zkClient.readData(path).toString();
logger.info("节点:{}出队,信息为:{}", path, data);
zkClient.delete(path);
}
logger.info("队列节点{}完成出队", "/queue");
}
}
WebListener.class:
@Component
public class WebListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent sce) {
ZookeeperQueue zookeeperQueue = new ZookeeperQueue("192.168.202.128:2181");
zookeeperQueue.start();
String nodaData1 = "nodaData1";
String nodaData2 = "nodaData2";
String nodaData3 = "nodaData3";
zookeeperQueue.start();
//生产者入队
zookeeperQueue.offer(nodaData1);
zookeeperQueue.offer(nodaData2);
zookeeperQueue.offer(nodaData3);
try{
Thread.sleep(10000);
}catch (Exception e) {
e.printStackTrace();
}
//消费者出队
zookeeperQueue.poll();
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
2、运行及运行结果: