【发布时间】:2015-03-31 11:18:25
【问题描述】:
我正在构建一个系统,其中生产者发送要排队的任务列表,这些任务将被许多消费者使用。
假设我有一个任务列表,它们可以分为黑色、橙色和黄色。所有黑色任务被发送到 Queue_0,橙色到 Queue_1,黄色到 Queue_2。我将为每个队列分配一个工作人员(即:Consumer_0 到 Queue_0,Consumer_1 到 Queue_1,Consumer_2 到 Queue_2)。如果黑名单变大,我想在 Queue_0 中添加一个额外的 Consumer(即 Consumer_3)来帮助 Consumer_0。
我浏览了 Worker Queues 和 Routing 上的 RabbitMQ 教程。我认为路由会解决我的问题。我启动了三个终端,一个生产者和两个消费者,它们将接收 Black 任务。当生产者发送几个黑色任务(Black_Task_1, Black_Task_2)时,两个消费者都收到了两条消息(即:Consumer_0 收到 Black_Task_1 和 Black_Task_2,Consumer_3 也收到 Black_Task_1 和 Black_Task_2)。我希望我的消费者分担任务,而不是做同样的任务。例如,Consumer_0 执行 Black_Task_1,而 Consumer_3 执行 Black_Task_2。我可以实现哪些配置?
==============================
更新
这是取自 RabbitMQ,路由教程的示例代码。我稍微修改了一下。请注意,此代码不会发送黑色、橙色或黄色队列。但是这个概念是存在的。
emit_log_direct.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
receive_logs_direct.py
#!/usr/bin/env python
import pika
import sys
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
time.sleep(1)
print " [x] Done"
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue=queue_name)
channel.start_consuming()
制片人
nuttynibbles$ ./4_emit_log_direct.py info "run run info"
[x] Sent 'info':'run run info'
Consumer_0
nuttynibbles$ ./4_receive_logs_direct_customize.py info
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':'run run info'
[x] Done
Consumer_3
nuttynibbles$ ./4_receive_logs_direct_customize.py info
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':'run run info'
[x] Done
【问题讨论】:
-
你是什么意思共享任务 - 你想让两个消费者处理同一个任务?
-
嗨,我在上面的问题中进一步解释了。
标签: python-2.7 rabbitmq