【发布时间】:2017-01-23 14:49:49
【问题描述】:
我正在研究使用 rabbitmq 来管理我的应用程序的事件。更具体地说,我想:
- 确保对每个队列的事件进行 FIFO 处理:在所有先前的事件都已完全处理之前,不会处理新事件。
- 确保我可以控制并行执行的事件数量。
一个典型的例子是我有 200 到 800 个队列,我不想让超过 8 个并行工作人员。
我决定使用 n+1 个队列和 n+m 个工人(n=200 到 800 和 m=8):
- 第一类工人 (n) 负责确保 FIFO 队列中的所有事件
- 第二种工人(m)只是以并行方式执行事件
这是伪代码:
def queues_declare(channel):
channel.queue_declare(queue='type1', durable=True)
channel.queue_declare(queue='type1_callback', durable=True)
channel.queue_declare(queue='type2', durable=True)
def type1(channel):
def callback_type1(ch, method, properties, body):
channel.basic_publish(exchange='',
routing_key='type2',
body=body,
properties=pika.BasicProperties(
reply_to = "type1_callback",
correlation_id = method.delivery_tag,
delivery_mode = 2,
))
def callback_type1_callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = properties.correlation_id)
ch.basic_ack(delivery_tag = method.delivery_tag)
queues_declare(channel)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback_type1,
queue='type1')
channel.basic_consume(callback_type1_callback,
queue='type1_callback')
def type2(channel):
queues_declare(channel)
def callback_type2(ch, method, properties, body):
# XXX: do work !
channel.basic_publish(exchange='',
routing_key=properties.reply_to,
body='',
properties=pika.BasicProperties(
correlation_id = properties.correlation_id,
))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback_type2,
queue='type2')
所以,我的问题是:这是用 rabbitmq 实现我想要的正确方法吗?有没有更好的方法来控制并行性并确保 FIFO 处理?
【问题讨论】: