【问题标题】:RabbitMQ - How do you programatically receive messages from the message queue?RabbitMQ - 您如何以编程方式从消息队列接收消息?
【发布时间】:2018-07-23 23:29:15
【问题描述】:

这是我目前所拥有的。我已经能够成功地将消息发布到队列并通过 RabbitMQ 的管理控制台看到它在那里。

但是,当我尝试接收它时,它似乎根本没有触发回调函数。

这里是相关代码。

MessageQueue mq = new MessageQueue();

mq.Receive("task_queue", (model, ea) => {
    var message = Encoding.UTF8.GetString(ea.Body);
    System.Diagnostics.Debug.WriteLine(message);
});

这是我在MessageQueue 类中的Receive 函数:

    public void Receive(string queueName, EventHandler<BasicDeliverEventArgs> onReceived)
    {
        using (IConnection connection = GetConnection(myLocalhost))
        {
            using (IModel channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

                // Don't dispatch a new message to a consumer until it has processed and acknowledged the previous one.
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                var consumer = new EventingBasicConsumer(channel); // non-blocking

                // Set the event to be executed when receiving a message
                consumer.Received += onReceived;

                // Register a consumer to listen to a specific queue. 
                channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
            }
        }
    }

当我在队列中有内容时尝试运行Receive 函数时,没有任何内容打印到我的输出窗口。

谁能帮我解决这个问题?

更新

我将Receive 函数中的代码放在与调用它的代码相同的文件中。仍然没有运气。这排除了我认为的范围界定问题。我还尝试将Received 字段设置为实际的事件委托(而不是onReceive 函数,并让该函数调用另一个函数,我在其中放置了一个断点。该函数永远不会被击中,这让我相信我的事件委托回调是永远不会被调用。

我不知道为什么会这样。正如 RabbitMQ 管理控制台向我显示的那样,该消息仍在从队列中消耗。我还尝试将队列重命名为其他名称,以确保没有其他幻像服务正在从同一个队列中使用。没有雪茄。

更新 2

我尝试提取两个using 语句并在其中调用我的Receive 函数以保持范围,但这也不起作用。我什至将整个Receive 块中的代码提取到一个主函数中,现在它甚至没有从队列中消耗。

【问题讨论】:

  • 很可能,您的程序正在使用消息,但在调用回调之前退出。我会提供进一步的建议,但由于您没有为您的程序提供完整的源代码,我只是猜测。
  • 服务器是否显示正在消费的消息?它是否显示消费者已连接?
  • 是的,消息已被消费,但管理控制台中队列的Consumers 部分中似乎没有列出任何消费者。

标签: c# rabbitmq


【解决方案1】:

看看你上面的代码,你有一个非常简单的问题。

在您调用channel.BasicConsume 后的那一刻,整个事物(连接/通道)将超出范围并立即通过using 语句处理/销毁。

为防止这种情况发生,您需要在channel.BasicConsume 之后立即有一个无限循环,当然要在关闭程序时退出适当的逻辑。

while (_isRunning & channel.IsOpen) {
    Thread.Sleep(1);
    // Other application logic here; e.g. periodically break out of the
    // loop to prevent unacknowledged messages from accumulating in the system
    // (if you don't, random effects will guarantee that they eventually build up)
}

【讨论】:

  • 感谢您的回复。我理解这个问题,但添加无限循环似乎对我不起作用。在我调用Receive 函数的课堂上,之后我需要做其他工作。我不能让它挂在Receive 上。有没有办法让它成为非阻塞的?
  • 我开始认为代码的组织方式有点不正确。消息的发送可以简单地是一个函数调用,但消息的接收需要在主程序中作为一个单独的进程运行。
  • 这是一种选择,也可以使用EventingBasicConsumer
  • 等等,我正在使用EventingBasicConsumer。那为什么我调用Receive函数不能让它工作呢?
  • 您仍然需要从专用线程进行调度。没有办法解决这个问题。但是,我创建了一个更健壮、可观察的实现,请参阅this answer
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-08-09
  • 2011-07-10
  • 2012-11-16
  • 2012-10-03
  • 2010-09-19
相关资源
最近更新 更多