【问题标题】:RabbitMQ dequeue multiple messages at timeRabbitMQ 一次出列多条消息
【发布时间】:2019-11-24 23:19:44
【问题描述】:

我在 .net 核心(来自 NuGet 的最新版本)中使用 RabbitMQ。我有一个队列,有优先权。

我的代码正在插入 10 条消息并从队列中取出 10 条消息。为了使消息出队,我使用了EventingBasicConsumer,它在 Push 上获取消息。

这是我的代码:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace Test
{
    class Program
    {
        public static IConnection Connection = GetConnection("localhost", "xxx", "xxx");
        public static IModel Channel = Connection.CreateModel();
        public static void Main(string[] args)
        {
            IDictionary<String, Object> args2 = new Dictionary<String, Object>();
            args2.Add("x-max-priority", 256);
            Channel.QueueDeclare("IDG", true, false, false, args2);

            for (int i = 0; i < 10; i++)
            {
                Send("IDG", (i % 10).ToString(), (byte)(i % 10));
                Console.WriteLine("Queued: "+ (i % 10).ToString());
            }

            Receive("IDG", Channel);
            Console.ReadLine();
        }

        public static IConnection GetConnection(string hostName, string userName, string password)
        {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = hostName;
            connectionFactory.UserName = userName;
            connectionFactory.Password = password;
            return connectionFactory.CreateConnection();
        }

        public static void Send(string queue, string data, byte priority)
        {
            var properties = Channel.CreateBasicProperties();
            properties.Priority = priority;
            Channel.BasicPublish(string.Empty, queue, properties, Encoding.UTF8.GetBytes(data));
        }

        public static void Receive(string queue, IModel channel)
        {
            IDictionary<String, Object> args2 = new Dictionary<String, Object>();
            args2.Add("x-max-priority", 256);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += Consumer1_Received;
            channel.BasicConsume(consumer, queue, autoAck: false, arguments: args2);
        }

        private static void Consumer1_Received(object sender, BasicDeliverEventArgs e)
        {
            var message = Encoding.UTF8.GetString(e.Body);
            Console.WriteLine($"{DateTime.Now}: [x] Received {message}");

            // Simulate Processing...
            Thread.Sleep(3000);

            Channel.BasicAck(e.DeliveryTag, false);
        }
    }
}

输出:

Queued: 0
Queued: 1
Queued: 2
Queued: 3
Queued: 4
Queued: 5
Queued: 6
Queued: 7
Queued: 8
Queued: 9
16-Jul-19 11:37:59 AM: [x] Received 9
16-Jul-19 11:38:02 AM: [x] Received 8
16-Jul-19 11:38:05 AM: [x] Received 7
16-Jul-19 11:38:08 AM: [x] Received 6
16-Jul-19 11:38:11 AM: [x] Received 5
16-Jul-19 11:38:14 AM: [x] Received 4
16-Jul-19 11:38:23 AM: [x] Received 3
16-Jul-19 11:38:23 AM: [x] Received 2
16-Jul-19 11:38:23 AM: [x] Received 1
16-Jul-19 11:38:23 AM: [x] Received 0

处理每条消息需要 3 秒(参见 Consumer1_Received 委托中的 Thread.Sleep(3000))。

问题

我想同时处理多条消息。假设我的机器可以同时处理 5 条消息。因此,处理全部 10 条消息大约需要 6 秒。

  1. 如何定义同时运行消息的一致性数量 时间?
  2. 如何拉取N 消息?我看到了BasicGet() 方法,它拉出一条消息。这是否可以拉出比消息更多的内容?

更新 1

我已经尝试过使用多个消费者,但是吞吐量是一样的。这是代码:

public static void Receive(string queue, IModel channel)
{
    IDictionary<String, Object> args2 = new Dictionary<String, Object>();
    args2.Add("x-max-priority", 256);

    channel.BasicQos(0, 1, true);

    var consumer1 = new EventingBasicConsumer(channel);
    consumer1.Received += Consumer_Received;
    channel.BasicConsume(consumer1, queue, autoAck: false, arguments: args2);

    var consumer2 = new EventingBasicConsumer(channel);
    consumer2.Received += Consumer_Received;
    channel.BasicConsume(consumer2, queue, autoAck: false, arguments: args2);
}

private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var message = Encoding.UTF8.GetString(e.Body);
    Console.WriteLine($"{DateTime.Now}: [x] Received {message}");

    // Simulate Processing...
    Thread.Sleep(3000);

    Channel.BasicAck(e.DeliveryTag, false);
}

输出是:(仍然是3秒\消息)

16-Jul-19 12:29:30 PM: [x] Received 6
16-Jul-19 12:29:33 PM: [x] Received 5
16-Jul-19 12:29:36 PM: [x] Received 4
16-Jul-19 12:29:39 PM: [x] Received 3
16-Jul-19 12:29:45 PM: [x] Received 2
16-Jul-19 12:29:51 PM: [x] Received 1
16-Jul-19 12:30:00 PM: [x] Received 0

【问题讨论】:

  • 运行 5 个消费者!!。每个消费者一次将处理一条消息。将prefetch_count 设置为 1。
  • 嗨@bumblebee,请参阅update1。我已经尝试过了 - 给了我同样的结果。

标签: c# rabbitmq


【解决方案1】:

您使用相同的IModel 实例进行消费和生产。创建两个单独的模型。 Thread.Sleep(3000) 阻止用于接收新消息的专用 rabbitmq 线程。

【讨论】:

  • 将 IModel 拆分为消费者和生产者会有所不同。谢谢!
猜你喜欢
  • 2023-03-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-08-06
  • 2014-07-18
  • 1970-01-01
  • 1970-01-01
  • 2021-07-28
相关资源
最近更新 更多