在前面一章介绍了在.Net Core中如何使用RabbitMQ,至此入门的的部分就完成了,我们内心中一定还有很多疑问:如果多个消费者消费同一个队列怎么办?如果这几个消费者分任务的权重不同怎么办?怎么把同一个队列不同级别的任务分发给不同的消费者?如果消费者异常离线怎么办?不要着急,后面将慢慢解开面纱。我们将结合实际的应用场景来讲解更多的高级用法。
任务分发机制
设想如果把每个消息当做一个任务,生产者把任务发布到RabbitMQ,然后Consumer接收消息处理任务,如果我们发现一个Consumer不能完成任务处理怎么办呢,我们会增加Consumer的数量。由一个Consumer增加到两个Consumer,如图由C变为C1和C2共同来分单工作。如果C1和C2是完全一样的,那RabbitMQ会将任务平均分发到两个消费者。
如下我们
新建ProductAckDemo开发布订阅内容
新建ConsumerAckDemo1和ConsumerAckDemo2项目来订阅同一个队列在接收到消息后sleep1秒模拟任务处理的时间。
ProductAckDemo代码,生产100条带编号的消息:
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace ProductAckDemo { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; String routeKeyName = "wytRouteKey"; String message = "Task --"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "direct", durable: true, autoDelete: false, arguments:null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; for (int i = 0; i < 100; i++) { Byte[] body = Encoding.UTF8.GetBytes(message+i); channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName, basicProperties: properties, body: body); } } } } } }