【问题标题】:RabbitMQ + kombu: write/read to one-time use queues with random namesRabbitMQ + kombu:使用随机名称写入/读取一次性使用队列
【发布时间】:2018-03-12 16:13:06
【问题描述】:

我不熟悉消息交换,但在为该任务寻找合适的手册时遇到了问题。

我需要组织队列池,这样:

  1. 生产者创建一些随机的空队列并在那里写入所有消息包(通常为 100 条消息)。

  2. 消费者找到非空非锁定队列并从中读取直到 它是空的,然后删除它并寻找下一个。

所以我的任务是将消息作为包处理,我了解如何在一个队列中使用相同的键来生成和使用,但找不到如何使用队列池。

我们可以让多个生产者和消费者并行运行,但不管它们中的哪一个发送给谁。 我们不需要也永远无法将特定的生产者与特定的消费者联系起来。

一般任务:我们有很多客户端接收推送通知,我们通过一些参数将推送分组以便稍后作为组处理,所以这样的组应该在 RabbitMQ 中的一个队列中进行生成和作为一个组消费,但每个组都独立于其他组。

非常感谢 Hannu 的帮助: 他的简单而强大的解决方案的关键思想是我们可以有一个已知名称的持久队列,生产者将写入创建队列的名称,消费者将读取这些名称从那里开始。

为了使他的解决方案更具可读性和更容易工作,在我的个人任务中,我将生产者中的 publish_data() 分为两个函数 - 一个创建随机队列并将其写入 control_queue 另一个接收这个 random_queue 并用消息填充它。类似的想法对消费者有好处 - 一个处理队列的函数,另一个将被调用来处理消息本身。

【问题讨论】:

  • 您有多个生产者和消费者应用程序吗? “另一种可能的变体”也可以接受吗?这就是我可能会这样做的方式,或者是它的一种变体。
  • Hannu,我更新了问题的答案。
  • 谢谢。我发布了一个候选答案。它的“kombuness”并没有闪耀,但它可以工作,并且至少会给出如何解决这个问题的想法。

标签: python rabbitmq message-queue kombu


【解决方案1】:

我已经做过类似的事情,但使用 Pika。我必须为示例清理和整理旧代码 sn-p。它可能不是很复杂(这绝对是我使用它编写的第一个代码 sn-p),但这就是我解决它的方法。基本上我会设置一个具有已知名称的控制队列。

发布者将为一组消息创建一个随机队列名称,将 N 条消息转储到其中(在我的情况下编号为 1-42),然后将队列名称发布到控制队列。消费者然后接收此队列名称,绑定到它,读取消息直到队列为空,然后删除队列。

这让事情变得相对简单,因为发布者不需要弄清楚他们可以在哪里发布他们的数据组(每个队列都是随机名称的新队列)。接收者不需要担心超时或“全部完成”消息,因为只有当一组数据已写入队列并且每条消息都在那里等待时,接收者才会收到队列名称。

也没有必要修补锁或信号或任何会使事情复杂化的东西。您可以拥有任意数量的消费者和生产者。当然,使用交换和路由密钥可能会有不同的消费者集用于不同的任务等。

出版商

from kombu import Connection
import uuid
from time import sleep
def publish_data(conn):
    random_name= "q" + str(uuid.uuid4()).replace("-", "")
    random_queue = conn.SimpleQueue(random_name)
    for i in xrange(0, 42):
        random_queue.put(i)
    random_queue.close()
    return random_name


with Connection('amqp://guest:guest@localhost:5672//') as conn:
    control_queue = conn.SimpleQueue('control_queue')
    _a = 0
    while True:
        y_name = publish_data(conn)
        message = y_name
        control_queue.put(message)
        print('Sent: {0}'.format(message))
        _a += 1
        sleep(0.3)
        if _a > 20:
            break

    control_queue.close()

消费者

from Queue import Empty

from kombu import Connection, Queue


def process_msg(foo):
    print str(foo)
    with Connection("amqp://guest:guest@localhost:5672//") as _conn:
        sub_queue = _conn.SimpleQueue(str(foo))
        while True:
            try:
                _msg = sub_queue.get(block=False)
                print _msg.payload
                _msg.ack()
            except Empty:
                break
        sub_queue.close()
        chan = _conn.channel()
        dq = Queue(name=str(foo), exchange="")
        bdq = dq(chan)
        bdq.delete()


with Connection('amqp://guest:guest@localhost:5672//') as conn:
    rec = conn.SimpleQueue('control_queue')
    while True:
        msg = rec.get(block=True)
        entry = msg.payload
        msg.ack()
        process_msg(entry)

【讨论】:

  • 非常感谢!我理解您的想法并对其进行了简要测试,到目前为止看起来不错。我会在几天内对其进行更多测试并接受。
  • 当然还有很多其他方法可以做到这一点,但我喜欢这个,因为它不需要任何信令、同步或控制结构,除了使用控制通道。这将使 Rabbit 为您完成所有的日程安排、分发和消息传递,并且它做得很好,因为它正是为此而设计的。唯一的通用接口是控制通道。不需要双向流量,也不需要对等进程之间的通信。一切都可以而且应该保持独立,并且通过消息确认,rabbit 还免费为您提供故障安全机制。
  • 是的,这段代码有效,使用 control_queue 传输队列名称的主要思想很棒,这是我错过的问题的关键。为了使解决方案在未来更具可读性和易用性,我将生产者中的 publish_data() 分为两个函数 - 一个创建随机队列并将其写入 control_queue,另一个接收此 random_queue 并用消息填充它。类似的想法对消费者有好处 - 一个处理队列的函数,另一个将被调用来处理消息本身。
  • 当然。将控制队列处理和消息队列处理分开是有意义的。也可能适用于线程。上面的代码只是一个概念证明。
猜你喜欢
  • 2018-10-20
  • 1970-01-01
  • 1970-01-01
  • 2013-07-05
  • 2022-08-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-04-07
相关资源
最近更新 更多