基于内存的单线程数据库,使Redis的线程安全性与性能极高。而Redis的双向链表数据类型(List)天生就可作为消息队列存储消息.
在这里就不说消息队列的等等一些优点。但是补充一下Redis的List类型的几个命令,你可以指定将一个元素投送到列表的头部(左边)或者尾部(右边),当然也可以指定从列表的头部或尾部取出数据.
LPush:添加元素至列表的头部
RPush:添加元素至列表的尾部
LPop:移除并获取列表的头部的第一个元素
RPop:移除并获取列表的尾部的第一个元素
BLpop:移出并获取列表头部的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。命令格式:blpop key timeout,当timeout=0时,表示一直阻塞等待,直到有其他客户端执行rpush或者lpush命令,插入数据后,阻塞才解除.
BRpop:与BLpop相同,不同的是它是移除列表尾部的第一个元素.
如下,开启两个客户端,一个客户端先使用BLpop阻塞读取数据,另一个客户端写入数据.
OK,到此我想你已经明白了,List的作用已经显而易见。生产者投入消息,消费者拿到消息。而且双向链表的数据类型,投入和拿取数据都特别灵活。是不是感觉很不错?接着往下看????
下面在代码中实现消息队列的数据投递与拾取.
引入NuGet包:StackExchange.Redis
生产者:
static void Main(string[] args)
{
var redis = new RedisContent();
for (int i = 0; i < 10; i++)
{
redis.db.ListRightPush("datalist", $"data{i}");//向列表的尾部投递消息
Console.WriteLine($"{DateTime.Now} 已投递消息data{i}!");
Thread.Sleep(3000);
}
Console.ReadKey();
}
消费者:
static void Main(string[] args)
{
var redis = new RedisContent();
while (true)
{
string result = redis.db.ListRightPop("datalist");//从列表的尾部拾取消息
if (string.IsNullOrEmpty(result)) { }
else
{
Console.WriteLine($"{DateTime.Now} 已接收消息,Message={result.ToString()}");
Thread.Sleep(1000);
}
}
}
先进先出和先进后出的实现方式都比较灵活,如果要想实现先进先出的规则的话,要将上面的消费者代码改为redis.db.ListLeftPop("datalist")=>从头部开始读取消息
但是上面的代码有一个很大的弊端,虽然消息已经消费完了,但是仍然在不停的lpop,所以造成很大的浪费.就算是这里使用了Sleep,一定程度上减少了CPU的占用率,但是消息处理的时效性就削弱了.
不用担心,对此肯定有解决的方法????,我们上面提到了Redis有两个阻塞命令BRpop与BLpop,这两个命令可以解决上述问题.有消息的话它就会帮你拿出来,而且不用while(true)的方式也会减少CPU的开销.因为列表没有消息的话,它就会一直阻塞,可以理解为保持了一个长连接(就相当于你问你女朋友为什么生气,然后她就说因为什么什么...,晚上你问她想吃点什么,然后她说想吃点什么什么...,你每次都要去问她,时间久了她就觉得很烦,会觉得你不懂她。所以你就住进她心里面,她心里面想什么你就能第一时间知道,用这个做比喻我相信你们都能懂????)。
但是StackExchange.Redis并没有提供BLpop与BRpop的API,我们可以使用使用pub/sub的方式.代码如下:
生产者:
static void Main(string[] args)
{
var redis = new RedisContent();
var sub = RedisContent.redis.GetSubscriber();
for (int i = 0; i < 10; i++)
{
sub.PublishAsync("datalist", $"data{i}").GetAwaiter();
Console.WriteLine($"{DateTime.Now} 已投递消息data{i}!");
Thread.Sleep(3000);
}
Console.ReadKey();
}
消费者:
static void Main(string[] args)
{
var redis = new RedisContent();
var sub = RedisContent.redis.GetSubscriber();
sub.Subscribe("datalist", (channel, message) =>
{
Console.WriteLine($"{DateTime.Now} 已接收消息,Message={message}");
Thread.Sleep(1000);
});
Console.WriteLine("消费者0已启动成功!");
Console.ReadKey();
}
分别启动两个消费者客户端
这种为广播模式,每一个订阅者都会收到消息。但是该消息不保证是否被接收,生产者投递完消息如果没有消费者接收的话,消息会丢失.
还有一种方式消息不会丢失,将消息存在列表里面。首先生产者向列表投入数据,紧接着去通知订阅者,让订阅者从列表中取出数据。但是有一个弊端,如果有多个消费者订阅时,只有一个消费者能取到数据。代码如下:
生产者:
var redis = new RedisContent();
var sub = RedisContent.redis.GetSubscriber();
for (int i = 0; i < 10; i++)
{
redis.db.ListLeftPush("datalist", $"data{i}", flags: CommandFlags.FireAndForget);
sub.PublishAsync("channel1", "").GetAwaiter();
Console.WriteLine($"{DateTime.Now} 已投递消息data{i}!");
Thread.Sleep(3000);
}
Console.ReadKey();
消费者:
static void Main(string[] args)
{
var redis = new RedisContent();
var sub = RedisContent.redis.GetSubscriber();
//如果消费者后启动,或者宕机重启,要先查询列表中是否有数据,如果有数据要消费掉
var len = redis.db.ListRange("datalist").Length;
if (len > 0)
{
Task.Run(() =>
{
for (int i = 0; i < len; i++)
{
string result = redis.db.ListRightPop("datalist");
//业务操作...
}
});
}
sub.Subscribe("channel1", (channel, message) =>
{
string result = redis.db.ListRightPop("datalist");
Console.WriteLine($"{DateTime.Now} 已接收消息,Message={result}");
Thread.Sleep(1000);
});
Console.WriteLine("消费者0已启动成功!");
Console.ReadKey();
}
如有不足,请见谅!今天是十月二号,昨天是中秋佳节又是国庆,在这里祝大家双节快乐。本来昨天晚上写完这篇的,但是八九点的时候太困了,就????... ...
昨晚坐在窗边,偶然间向外面瞄了一眼,月亮的光芒太耀眼了,也太漂亮了,赶紧拍了一张????