Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:
由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。
所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。简单示例如下:
存放消息端(消息生产者):
-
package org.yamikaze.redis.messsage.queue; -
import org.yamikaze.redis.test.MyJedisFactory; -
import redis.clients.jedis.Jedis; -
import java.util.concurrent.TimeUnit; -
/** -
* 消息生产者 -
* @author yamikaze -
*/ -
public class Producer extends Thread { -
public static final String MESSAGE_KEY = "message:queue"; -
private Jedis jedis; -
private String producerName; -
private volatile int count; -
public Producer(String name) { -
this.producerName = name; -
init(); -
} -
private void init() { -
jedis = MyJedisFactory.getLocalJedis(); -
} -
public void putMessage(String message) { -
Long size = jedis.lpush(MESSAGE_KEY, message); -
System.out.println(producerName + ": 当前未被处理消息条数为:" + size); -
count++; -
} -
public int getCount() { -
return count; -
} -
@Override -
public void run() { -
try { -
while (true) { -
putMessage(StringUtils.generate32Str()); -
TimeUnit.SECONDS.sleep(1); -
} -
} catch (InterruptedException e) { -
} catch (Exception e) { -
e.printStackTrace(); -
} -
} -
public static void main(String[] args) throws InterruptedException{ -
Producer producer = new Producer("myProducer"); -
producer.start(); -
for(; ;) { -
System.out.println("main : 已存储消息条数:" + producer.getCount()); -
TimeUnit.SECONDS.sleep(10); -
} -
} -
}
消息处理端(消息消费者):
-
package org.yamikaze.redis.messsage.queue; -
import org.yamikaze.redis.test.MyJedisFactory; -
import redis.clients.jedis.Jedis; -
/** -
* 消息消费者 -
* @author yamikaze -
*/ -
public class Customer extends Thread{ -
private String customerName; -
private volatile int count; -
private Jedis jedis; -
public Customer(String name) { -
this.customerName = name; -
init(); -
} -
private void init() { -
jedis = MyJedisFactory.getLocalJedis(); -
} -
public void processMessage() { -
String message = jedis.rpop(Producer.MESSAGE_KEY); -
if(message != null) { -
count++; -
handle(message); -
} -
} -
public void handle(String message) { -
System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条"); -
} -
@Override -
public void run() { -
while (true) { -
processMessage(); -
} -
} -
public static void main(String[] args) { -
Customer customer = new Customer("yamikaze"); -
customer.start(); -
} -
}
但上述例子中消息消费者有一个问题存在,即需要不停的调用rpop方法查看List中是否有待处理消息。每调用一次都会发起一次连接,这会造成不必要的浪费。也许你会使用Thread.sleep()等方法让消费者线程隔一段时间再消费,但这样做有两个问题:
1)、如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
2)、如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。
所以可以使用brpop指令,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端可以将processMessage可以改为这样:
-
public void processMessage() { -
/** -
* brpop支持多个列表(队列) -
* brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。 -
* 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY -
* 0表示不限制等待,会一直阻塞在这儿 -
*/ -
List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey"); -
if(messages.size() != 0) { -
//由于该指令可以监听多个Key,所以返回的是一个列表 -
//列表由2项组成,1) 列表名,2)数据 -
String keyName = messages.get(0); -
//如果返回的是MESSAGE_KEY的消息 -
if(Producer.MESSAGE_KEY.equals(keyName)) { -
String message = messages.get(1); -
handle(message); -
} -
} -
System.out.println("======================="); -
}
然后可以运行Customer,清空控制台,可以看到程序没有任何输出,阻塞在了brpop这儿。然后在打开Redis的客户端,输入指令client list,可以查看当前有两个连接。
发布/订阅模式
Redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。
1)发布
PUBLISH指令可用于发布一条消息,格式 PUBLISH channel message
返回值表示订阅了该消息的数量。
2)订阅
SUBSCRIBE指令用于接收一条消息,格式 SUBSCRIBE channel
可以看到使用SUBSCRIBE指令后进入了订阅模式,但没有接收到publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。回复分为三种类型:
1、如果为subscribe,第二个值表示订阅的频道,第三个值表示是第几个订阅的频道?(理解成序号?)
2、如果为message(消息),第二个值为产生该消息的频道,第三个值为消息
3、如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。
可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。
Redis还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:
再试试推送消息会得到以下结果:
可以看到publish指令返回的是2,而订阅端这边接收了两次消息。这是因为PSUBSCRIBE指令可以重复订阅频道。而使用PSUBSCRIBE指令订阅的频道也要使用指令PUNSUBSCRIBE指令退订,该指令无法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。同时PUNSUBSCRIBE指令通配符不会展开。
例如:PUNSUBSCRIBE * 不会匹配到 channel.*, 所以要取消订阅channel.*就要这样写PUBSUBSCRIBE channel.*。
代码示范如下:
-
package org.yamikaze.redis.messsage.subscribe; -
import org.yamikaze.redis.messsage.queue.StringUtils; -
import org.yamikaze.redis.test.MyJedisFactory; -
import redis.clients.jedis.Jedis; -
/** -
* 消息发布方 -
* @author yamikaze -
*/ -
public class Publisher { -
public static final String CHANNEL_KEY = "channel:message"; -
private Jedis jedis; -
public Publisher() { -
jedis = MyJedisFactory.getLocalJedis(); -
} -
public void publishMessage(String message) { -
if(StringUtils.isBlank(message)) { -
return; -
} -
jedis.publish(CHANNEL_KEY, message); -
} -
public static void main(String[] args) { -
Publisher publisher = new Publisher(); -
publisher.publishMessage("Hello Redis!"); -
} -
}
简单的发送一个消息。
消息订阅方:
-
package org.yamikaze.redis.messsage.subscribe; -
import org.yamikaze.redis.test.MyJedisFactory; -
import redis.clients.jedis.Jedis; -
import redis.clients.jedis.JedisPubSub; -
import java.util.concurrent.TimeUnit; -
/** -
* 消息订阅方客户端 -
* @author yamikaze -
*/ -
public class SubscribeClient { -
private Jedis jedis; -
private static final String EXIT_COMMAND = "exit"; -
public SubscribeClient() { -
jedis = MyJedisFactory.getLocalJedis(); -
} -
public void subscribe(String ...channel) { -
if(channel == null || channel.length <= 0) { -
return; -
} -
//消息处理,接收到消息时如何处理 -
JedisPubSub jps = new JedisPubSub() { -
/** -
* JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现 -
* 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage -
* 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法 -
* 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[] -
*/ -
@Override -
public void onMessage(String channel, String message) { -
if(Publisher.CHANNEL_KEY.equals(channel)) { -
System.out.println("接收到消息: channel : " + message); -
//接收到exit消息后退出 -
if(EXIT_COMMAND.equals(message)) { -
System.exit(0); -
} -
} -
} -
/** -
* 订阅时 -
*/ -
@Override -
public void onSubscribe(String channel, int subscribedChannels) { -
if(Publisher.CHANNEL_KEY.equals(channel)) { -
System.out.println("订阅了频道:" + channel); -
} -
} -
}; -
//可以订阅多个频道 当前线程会阻塞在这儿 -
jedis.subscribe(jps, channel); -
} -
public static void main(String[] args) { -
SubscribeClient client = new SubscribeClient(); -
client.subscribe(Publisher.CHANNEL_KEY); -
//并没有 unsubscribe方法 -
//相应的也没有punsubscribe方法 -
} -
}
先运行client,再运行Publisher进行消息发送,输出结果:
总结:
使用Redis的List数据结构可以简单迅速地做一个消息队列,同时Redis提供的BRPOP和BLPOP等指令解决了频繁调用Jedis的rpop和lpop方法造成的资源浪费问题。除此之外,Redis提供对发布/订阅模式的指令,可以实现消息传递、进程间通信。
自我总结:redis的list作为消息队列是可以存储消息的,但是如果使用分布/订阅模式的话,如果一旦订阅者服务器宕机,那么主题发布的消息,就会被订阅者漏掉消息,这样就造成了信息丢失。所以如果做大数据的消息列表还是得用redis的list。当然直接使用mq更好啦!