【问题标题】:Consume multiple messages at a time一次使用多条消息
【发布时间】:2014-07-18 22:34:36
【问题描述】:

我正在使用外部服务(Service)来处理某些特定类型的对象。如果我以 10 个为一组发送对象,该服务的工作速度会更快。我当前的架构如下。生产者一个接一个地广播对象,而一群消费者(一个接一个)从队列中拉出它们并将它们发送到服务。这显然不是最理想的。

我不想修改生产者代码,因为它可以用于不同的情况。我可以修改消费者代码,但代价是额外的复杂性。我也知道prefetch_count 选项,但我认为它只适用于网络级别——客户端库(pika)不允许在消费者回调中一次获取多条消息。

那么,RabbitMQ 可以在将消息发送给消费者之前创建成批消息吗?我正在寻找“一次消费 n 条消息”之类的选项。 p>

【问题讨论】:

  • 那么prefetch_count 有什么问题?您可以使用N 的消息数量,对它们进行计数,一旦计数超过某个值,就一次性处理它们。此外,您还可以批量确认。
  • 如果队列中只有n < N 项怎么办?然后消费者将挂起一段不确定的时间,等待N - n 更多消息排队。就我而言,这是一个关键问题。
  • 您可以在每条消息之后检查队列长度,或者引入某种计时器,除了计数消息之外,它还会每 'K' 秒处理一次所有消息。
  • 是的,这正是我所说的“额外的复杂性”。不过谢谢你指出来。现在,我知道我至少有一个合理的解决方案。

标签: python rabbitmq pika


【解决方案1】:

您不能在消费者回调中批处理消息,但您可以使用线程安全库并使用多个线程来消费数据。这样做的好处是您可以在五个不同的线程上获取五条消息,并在需要时组合数据。

作为一个例子,你可以看看我将如何使用我的 AMQP 库来实现它。 https://github.com/eandersson/amqpstorm/blob/master/examples/scalable_consumer.py

【讨论】:

  • 我将使用它来创建自定义预取逻辑,谢谢
猜你喜欢
  • 2011-04-04
  • 2019-11-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-10-25
  • 2017-01-24
  • 1970-01-01
相关资源
最近更新 更多