【问题标题】:how can I control max parallelism with guaranteed FIFO?如何通过保证 FIFO 控制最大并行度?
【发布时间】:2017-01-23 14:49:49
【问题描述】:

我正在研究使用 rabbitmq 来管理我的应用程序的事件。更具体地说,我想:

  1. 确保对每个队列的事件进行 FIFO 处理:在所有先前的事件都已完全处理之前,不会处理新事件。
  2. 确保我可以控制并行执行的事件数量。

一个典型的例子是我有 200 到 800 个队列,我不想让超过 8 个并行工作人员。

我决定使用 n+1 个队列和 n+m 个工人(n=200 到 800 和 m=8):

  • 第一类工人 (n) 负责确保 FIFO 队列中的所有事件
  • 第二种工人(m)只是以并行方式执行事件

这是伪代码:

def queues_declare(channel):
    channel.queue_declare(queue='type1', durable=True)
    channel.queue_declare(queue='type1_callback', durable=True)
    channel.queue_declare(queue='type2', durable=True)

def type1(channel):
    def callback_type1(ch, method, properties, body):
        channel.basic_publish(exchange='',
                              routing_key='type2',
                              body=body,
                              properties=pika.BasicProperties(
                                  reply_to = "type1_callback",
                                  correlation_id = method.delivery_tag,
                                  delivery_mode = 2,
                              ))
    def callback_type1_callback(ch, method, properties, body):
        ch.basic_ack(delivery_tag = properties.correlation_id)
        ch.basic_ack(delivery_tag = method.delivery_tag)

    queues_declare(channel)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback_type1,
                          queue='type1')
    channel.basic_consume(callback_type1_callback,
                          queue='type1_callback')

def type2(channel):
    queues_declare(channel)

    def callback_type2(ch, method, properties, body):
        # XXX: do work !
        channel.basic_publish(exchange='',
                              routing_key=properties.reply_to,
                              body='',
                              properties=pika.BasicProperties(
                                  correlation_id = properties.correlation_id,
                              ))
        ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_consume(callback_type2,
                          queue='type2')

所以,我的问题是:这是用 rabbitmq 实现我想要的正确方法吗?有没有更好的方法来控制并行性并确保 FIFO 处理?

【问题讨论】:

    标签: python rabbitmq


    【解决方案1】:

    这里有几个问题。

    1. 保证 FIFO 顺序的唯一方法是使用单个队列进行序列化访问。并且使用大量仅将消息重新发布到该单个队列的工作人员实际上会稍微放松一下保证-因此最好以消息将直接到达该队列的方式设置消息传递结构。 无论如何,最大的缺点是您的性能受 CPU 单核性能的约束。

    2. 有一种方法可以仅使用 RabbitMQ 本身来限制并发。为此,您需要创建一个单独的队列,并使用与您所需的并发级别相等的消息量预先填充它。然后,您的工作人员应该做的第一件事是尝试获取该消息,但不要确认它-因此该消息将在工作人员的整个生命周期中以这种非确认状态挂起。当工人死亡(或只是关闭 AMQP 连接)时,任何其他工人都可以访问该消息以获取它。但同样,这里也有一个缺点——这只能在非集群环境中可靠地工作。例如。请参阅https://aphyr.com/posts/315-jepsen-rabbitmq,其中几乎正是在测试这个用例等等。

    【讨论】:

      猜你喜欢
      • 2016-11-09
      • 2019-07-11
      • 2011-11-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多