类似于MQ的主题模式-只能消费订阅之后发布的消息,一个消息可以被多个订阅者消费)

1.客户端发布/订阅

1.1   普通的发布/订阅

   除了实现任务队列外,redis还提供了一组命令可以让开发者实现"发布/订阅"(publish/subscribe)模式。"发布/订阅"模式同样可以实现进程间的消息传递,其原理如下:

  "发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。

(1)发布消息

  发布者发布消息的命令是  publish,用法是 publish channel message,如向 channel1.1说一声hi

127.0.0.1:6379> publish channel:1 hi
(integer) 0

这样消息就发出去了。返回值表示接收这条消息的订阅者数量。发出去的消息不会被持久化,也就是有客户端订阅channel:1后只能接收到后续发布到该频道的消息,之前的就接收不到了。

 

(2)订阅频道

  订阅频道的命令是 subscribe,可以同时订阅多个频道,用法是 subscribe channel1 [channel2 ...],例如新开一个客户端订阅上面频道:(不会收到消息,因为不会收到订阅之前就发布到该频道的消息)

127.0.0.1:6379> subscribe channel:1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:1"
3) (integer) 1

执行上面命令客户端会进入订阅状态,处于此状态下客户端不能使用除subscribe、unsubscribe、psubscribe和punsubscribe这四个属于"发布/订阅"之外的命令,否则会报错。

  进入订阅状态后客户端可能收到3种类型的回复。每种类型的回复都包含3个值,第一个值是消息的类型,根据消类型的不同,第二个和第三个参数的含义可能不同。

消息类型的取值可能是以下3个:

  (1)subscribe。表示订阅成功的反馈信息。第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量。

  (2)message。表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容。

  (3)unsubscribe。表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为0时客户端会退出订阅状态,之后就可以执行其他非"发布/订阅"模式的命令了。

 

(3)第一个客户端重新向channel:1发送一条消息

127.0.0.1:6379> publish channel:1 hi
(integer) 1

 

返回值表示订阅此频道的数量

c

上面订阅的客户端:

127.0.0.1:6379> subscribe channel:1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:1"
3) (integer) 1
1) "message"
2) "channel:1"
3) "hi"

红字部分表示成功的收到消息(依次是消息类型,频道,消息内容)

 

1.2   按照规则发布/订阅

  除了可以使用subscribe命令订阅指定的频道外,还可以使用psubscribe命令订阅指定的规则。规则支持通配符格式。命令格式为      psubscribe pattern [pattern ...]订阅多个模式的频道。

  通配符中?表示1个占位符,*表示任意个占位符(包括0),?*表示1个以上占位符。

例如:

(1)订阅者订阅三个通配符频道

127.0.0.1:6379> psubscribe c? b* d?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?"
3) (integer) 1
1) "psubscribe"
2) "b*"
3) (integer) 2
1) "psubscribe"
2) "d?*"
3) (integer) 3

 

(2)新开一个客户端发送到指定频道

C:\Users\liqiang>redis-cli
127.0.0.1:6379> publish c m1
(integer) 0
127.0.0.1:6379> publish c1 m1
(integer) 1
127.0.0.1:6379> publish c11 m1
(integer) 0
127.0.0.1:6379> publish b m1
(integer) 1
127.0.0.1:6379> publish b1 m1
(integer) 1
127.0.0.1:6379> publish b11 m1
(integer) 1
127.0.0.1:6379> publish d m1
(integer) 0
127.0.0.1:6379> publish d1 m1
(integer) 1
127.0.0.1:6379> publish d11 m1
(integer) 1

上面返回值为1表示被订阅者所接受,可以匹配上面的通配符。

 

订阅者客户端:

127.0.0.1:6379> psubscribe c? b* d?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?"
3) (integer) 1
1) "psubscribe"
2) "b*"
3) (integer) 2
1) "psubscribe"
2) "d?*"
3) (integer) 3
1) "pmessage"
2) "c?"
3) "c1"
4) "m1"
1) "pmessage"
2) "b*"
3) "b"
4) "m1"
1) "pmessage"
2) "b*"
3) "b1"
4) "m1"
1) "pmessage"
2) "b*"
3) "b11"
4) "m1"
1) "pmessage"
2) "d?*"
3) "d1"
4) "m1"
1) "pmessage"
2) "d?*"
3) "d11"
4) "m1"

 

 

 

注意:

(1)使用psubscribe命令可以重复订阅同一个频道,如客户端执行了psubscribe c? c?*。这时向c1发布消息客户端会接受到两条消息,而同时publish命令的返回值是2而不是。.同样的,如果有另一个客户端执行了subscribe c1 和psubscribe c?*的话,向c1发送一条消息该客户顿也会受到两条消息(但是是两种类型:message和pmessage),同时publish命令也返回2.

(2)punsubscribe命令可以退订指定的规则,用法是: punsubscribe [pattern [pattern ...]],如果没有参数则会退订所有规则。

(3)使用punsubscribe只能退订通过psubscribe命令订阅的规则,不会影响直接通过subscribe命令订阅的频道;同样unsubscribe命令也不会影响通过psubscribe命令订阅的规则。另外需要注意punsubscribe命令退订某个规则时不会将其中的通配符展开,而是进行严格的字符串匹配,所以punsubscribe * 无法退订c*规则,而是必须使用punsubscribe c*才可以退订。

 

2.Java程序实现发布者订阅者模式

1.生产者

import redis.clients.jedis.Jedis;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:29 2018/10/9
 */
public class MessageProducer extends Thread {
    public static final String CHANNEL_KEY = "channel:1";
    private volatile int count;

    public void putMessage(String message) {
        Jedis jedis = JedisPoolUtils.getJedis();
        Long publish = jedis.publish(CHANNEL_KEY, message);//返回订阅者数量
        System.out.println(Thread.currentThread().getName() + " put message,count=" + count+",subscriberNum="+publish);
        count++;
    }

    @Override
    public synchronized void run() {
        for (int i = 0; i < 5; i++) {
            putMessage("message" + count);
        }
    }

    public static void main(String[] args) {
        MessageProducer messageProducer = new MessageProducer();
        Thread t1 = new Thread(messageProducer, "thread1");
        Thread t2 = new Thread(messageProducer, "thread2");
        Thread t3 = new Thread(messageProducer, "thread3");
        Thread t4 = new Thread(messageProducer, "thread4");
        Thread t5 = new Thread(messageProducer, "thread5");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
    }
}

结果:

thread1 put message,count=0,subscriberNum=0
thread1 put message,count=1,subscriberNum=0
thread1 put message,count=2,subscriberNum=0
thread1 put message,count=3,subscriberNum=0
thread1 put message,count=4,subscriberNum=0
thread4 put message,count=5,subscriberNum=0
thread4 put message,count=6,subscriberNum=0
thread4 put message,count=7,subscriberNum=0
thread4 put message,count=8,subscriberNum=0
thread4 put message,count=9,subscriberNum=0
thread5 put message,count=10,subscriberNum=0
thread5 put message,count=11,subscriberNum=0
thread5 put message,count=12,subscriberNum=0
thread5 put message,count=13,subscriberNum=0
thread5 put message,count=14,subscriberNum=0
thread2 put message,count=15,subscriberNum=0
thread2 put message,count=16,subscriberNum=0
thread2 put message,count=17,subscriberNum=0
thread2 put message,count=18,subscriberNum=0
thread2 put message,count=19,subscriberNum=0
thread3 put message,count=20,subscriberNum=0
thread3 put message,count=21,subscriberNum=0
thread3 put message,count=22,subscriberNum=0
thread3 put message,count=23,subscriberNum=0
thread3 put message,count=24,subscriberNum=0
View Code

相关文章: