【问题标题】:RabbitMQ - combine Work Queues and Routing QueuesRabbitMQ - 结合工作队列和路由队列
【发布时间】:2015-03-31 11:18:25
【问题描述】:

我正在构建一个系统,其中生产者发送要排队的任务列表,这些任务将被许多消费者使用。

假设我有一个任务列表,它们可以分为黑色、橙色和黄色。所有黑色任务被发送到 Queue_0,橙色到 Queue_1,黄色到 Queue_2。我将为每个队列分配一个工作人员(即:Consumer_0 到 Queue_0,Consumer_1 到 Queue_1,Consumer_2 到 Queue_2)。如果黑名单变大,我想在 Queue_0 中添加一个额外的 Consumer(即 Consumer_3)来帮助 Consumer_0。

我浏览了 Worker QueuesRouting 上的 RabbitMQ 教程。我认为路由会解决我的问题。我启动了三个终端,一个生产者和两个消费者,它们将接收 Black 任务。当生产者发送几个黑色任务(Black_Task_1, Black_Task_2)时,两个消费者都收到了两条消息(即:Consumer_0 收到 Black_Task_1 和 Black_Task_2,Consumer_3 也收到 Black_Task_1 和 Black_Task_2)。我希望我的消费者分担任务,而不是做同样的任务。例如,Consumer_0 执行 Black_Task_1,而 Consumer_3 执行 Black_Task_2。我可以实现哪些配置?

==============================

更新

这是取自 RabbitMQ,路由教程的示例代码。我稍微修改了一下。请注意,此代码不会发送黑色、橙色或黄色队列。但是这个概念是存在的。

emit_log_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

receive_logs_direct.py

#!/usr/bin/env python
import pika
import sys
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
    time.sleep(1)
    print " [x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=queue_name)

channel.start_consuming()

制片人

nuttynibbles$ ./4_emit_log_direct.py info "run run info"
 [x] Sent 'info':'run run info'

Consumer_0

nuttynibbles$ ./4_receive_logs_direct_customize.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':'run run info'
 [x] Done

Consumer_3

nuttynibbles$ ./4_receive_logs_direct_customize.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':'run run info'
 [x] Done

【问题讨论】:

  • 你是什么意思共享任务 - 你想让两个消费者处理同一个任务?
  • 嗨,我在上面的问题中进一步解释了。

标签: python-2.7 rabbitmq


【解决方案1】:

我认为您的基本问题在于:

如果黑色 lists 队列变大,我想添加一个额外的消费者(即: Consumer_3) 到 Queue_0 以帮助 Consumer_0。

只要您将另一个消费者添加到队列中 - 它就会获取下一条可用消息。

如果第一个消费者没有acknowledge the message;然后多个工作人员将能够处理同一条消息,因为它将保留在队列中。

因此,请确保您正确地确认消息:

默认情况下,RabbitMQ 会将每条消息发送给下一个消费者,在 顺序。平均每个消费者将获得相同数量的 消息。这种分发消息的方式称为循环。 [...] 没有任何消息超时; RabbitMQ 将重新传递消息 仅当工作人员连接终止时。即使处理一个也没关系 消息需要非常非常长的时间。

根据任务的性质,您可以通过创建优先级队列在多个进程之间拆分工作; C1(消费者)使用它来获取额外的资源。在这种情况下,您必须让工作人员准备好并在单独的优先级队列上进行侦听;从而创建一个子队列,其中正在处理 T1(一个任务)。

但是,除此之外,初始的C1 必须通过确认任务的接收来确保任务不再可用。

【讨论】:

  • 嗨,我添加了我的代码 sn-ps。我不确定我是否做得正确。在receive_logs_direct.py 中,我在callback() 中添加了basic_ack()。但仍无法按预期工作
【解决方案2】:

我认为您的问题是您正在为每个消费者创建一个新队列。当你打电话时

结果 = channel.queue_declare(exclusive=True)

queue_name = result.method.queue

在你的消费者中,这声明了一个新队列,告诉 RabbitMQ 为它生成一个唯一的名称,并将它标记为由调用它的消费者中的通道独占使用。这意味着每个消费者都有自己的队列。

然后,您使用严重性作为路由键将每个新队列绑定到交换器。当消息进入直接 Exchange 时,RabbitMQ 会将其副本路由到与匹配路由键绑定的每个队列。队列中没有循环。每个消费者都将获得消息的副本,这就是您正在观察的内容。

我相信您想要做的是让每个消费者对队列使用相同的名称,在 queue_declare 中指定名称,并且不要使其独占。然后所有的消费者都将监听同一个队列。消息将以循环方式传递给其中一个消费者。

生产者(emit_log.py 程序)不声明或绑定队列——它不是必须的,但是如果在发送消息之前没有建立绑定,它将被丢弃。如果您使用的是固定队列,您也可以让生产者设置它,只需确保使用与消费者相同的参数(例如 queue_name)。

【讨论】:

    猜你喜欢
    • 2018-10-18
    • 1970-01-01
    • 2017-03-30
    • 1970-01-01
    • 1970-01-01
    • 2016-07-06
    • 1970-01-01
    • 1970-01-01
    • 2021-06-14
    相关资源
    最近更新 更多