【问题标题】:Python Kombu - blockingPython Kombu - 阻塞
【发布时间】:2014-05-30 04:36:56
【问题描述】:

我正在使用 kombu 通过生产者/消费者模型来管理 RabbitMQ。我启动了我的生产者,它在一个队列中放置了 100 个作业(我只有一个队列和一个交换器)。我想同时启动多个消费者,并让每个消费者一次处理一项工作。不幸的是,消费者相互阻塞(即,当一个消费者从队列中获取工作时,其他消费者只是闲置)。如果我杀死了正在工作的消费者,那么其他消费者之一就会介入并开始工作。有没有办法让所有消费者同时运行,每个消费者处理队列中的不同作业?我的消费者代码如下:

def start_consumer(self, incoming_exchange_name):
    if self.rabbitmq_connection.connected:
        callbacks=[]
        queues=[]

        callbacks.append(self._callback)
        queues.append(self.incoming_queue)

        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
        self.incoming_exchange(settings.rabbitmq_connection).declare()
        self.incoming_queue(settings.rabbitmq_connection).declare()

        with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
            while True:
                try:
                    self.rabbitmq_connection.drain_events()
                except Exception as e:
                    print 'Error -> %s' % e.message 

【问题讨论】:

    标签: python multithreading rabbitmq pika kombu


    【解决方案1】:

    您需要将消费者预取设置为 1 (https://kombu.readthedocs.org/en/latest/reference/kombu.transport.pyamqp.html#kombu.transport.pyamqp.Connection.Channel.basic_qos),这样每个消费者将只获取 1 条消息,并将其余消息保留在队列中并保持状态就绪,因此如果您有 2 个消费者的 QOS 设置为1 并且您有 100 条消息,您将同时处理 2 个任务。

    我已将缺少的部分添加到您的代码中,以设置预取计数

    def start_consumer(self, incoming_exchange_name):
    if self.rabbitmq_connection.connected:
        callbacks=[]
        queues=[]
    
        callbacks.append(self._callback)
        queues.append(self.incoming_queue)
    
        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
        self.incoming_exchange(settings.rabbitmq_connection).declare()
        self.incoming_queue(settings.rabbitmq_connection).declare()
    
        channel = self.rabbitmq_connection.channel()
        channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)
    
        with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks, channel=channel) as consumer:
            while True:
                try:
                    self.rabbitmq_connection.drain_events()
                except Exception as e:
                    print 'Error -> %s' % e.message 
    

    【讨论】:

      【解决方案2】:

      我认为您实际上是在尝试自己重写 Celery:

      http://www.celeryproject.org/

      除非您只是为了学习目的而这样做,否则请避免痛苦并使用 Celery。顺便说一句,kombuRabbitMQ 正是 C​​elery 使用的后端(更不用说 Redis 后端可用,这在某些应用程序中为我节省了数小时的工作量)。

      【讨论】:

      • 实际上不是,celery 有助于分布式任务运行的特定用例。这很擅长。但是 OP 可能正在尝试(并在此过程中滥用“工作”一词)做其他事情,例如使用 amqp 消息队列在不同服务之间进行有序、持久和最终一致的事件驱动通信。或类似的东西。 Celery 不是为 Ordered 事件驱动架构而构建的。..
      • @alonisser - 确实,Celery 还不够
      猜你喜欢
      • 2021-07-15
      • 2011-07-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多