在前面一章介绍了在.Net Core中如何使用RabbitMQ,至此入门的的部分就完成了,我们内心中一定还有很多疑问:如果多个消费者消费同一个队列怎么办?如果这几个消费者分任务的权重不同怎么办?怎么把同一个队列不同级别的任务分发给不同的消费者?如果消费者异常离线怎么办?不要着急,后面将慢慢解开面纱。我们将结合实际的应用场景来讲解更多的高级用法。

任务分发机制

设想如果把每个消息当做一个任务,生产者把任务发布到RabbitMQ,然后Consumer接收消息处理任务,如果我们发现一个Consumer不能完成任务处理怎么办呢,我们会增加Consumer的数量。由一个Consumer增加到两个Consumer,如图由C变为C1和C2共同来分单工作。如果C1和C2是完全一样的,那RabbitMQ会将任务平均分发到两个消费者。 

RabbitMQ消息队列(六)-消息任务分发与消息ACK确认机制(.Net Core版)

 

如下我们

新建ProductAckDemo开发布订阅内容

新建ConsumerAckDemo1和ConsumerAckDemo2项目来订阅同一个队列在接收到消息后sleep1秒模拟任务处理的时间。

ProductAckDemo代码,生产100条带编号的消息:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace ProductAckDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchange";
            String routeKeyName = "wytRouteKey";
            String message = "Task --";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection=factory.CreateConnection())
            {
                using (IModel channel=connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "direct", durable: true, autoDelete: false, arguments:null);

                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    for (int i = 0; i < 100; i++)
                    {
                        Byte[] body = Encoding.UTF8.GetBytes(message+i);

                        channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName, basicProperties: properties, body: body);
                    }
                }
            }
        }
    }
}
View Code

相关文章: