【问题标题】:Using concurrent.futures to consume many dequeued messages a time使用 concurrent.futures 一次消耗许多出列消息
【发布时间】:2017-05-14 12:27:53
【问题描述】:

我正在使用来自 RabbitMQ 通道的消息,我希望我可以一次使用 n 个元素。我想我可以使用 ProcessPoolExecutor(或 ThreadPoolExecutor)。 我只是想知道是否有可能知道池中是否有免费的执行者。

这就是我要写的:

executor = futures.ProcessPoolExecutor(max_workers=5)
running = []
def consume(message):
    print "actually consuming a single message"

def on_message(channel, method_frame, header_frame, message):
    # this method is called once per incoming message
    future = executor.submit(consume, message)
    block_until_a_free_worker(executor, future)

def block_until_a_free_worker(executor, future):
    running.append(future) # this grows forever!
    futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)

[...]
channel.basic_consume(on_message, 'my_queue')
channel.start_consuming()

我需要编写函数block_until_a_free_worker。 这种方法应该能够检查是否所有正在运行的工作人员都在使用中。

如果可用,我可以使用任何阻塞的 executor.submit 选项。

我尝试了一种不同的方法,并在它们完成的同时更改了期货列表。 我尝试从列表中显式添加和删除期货,然后像这样等待:

futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)

这似乎不是一个解决方案。

我可以设置一个future.add_done_callback,并可能计算正在运行的实例...

任何提示或想法? 谢谢。

【问题讨论】:

标签: python rabbitmq multiprocessing pika concurrent.futures


【解决方案1】:

我给了类似的答案here

信号量的作用是将资源的访问权限限制为一组工作人员。

from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor 

class TaskManager:
    def __init__(self, workers):
        self.pool = ProcessPoolExecutor(max_workers=workers)
        self.workers = Semaphore(workers)

    def new_task(self, function):
        """Start a new task, blocks if all workers are busy."""
        self.workers.acquire()  # flag a worker as busy

        future = self.pool.submit(function, ... )

        future.add_task_done(self.task_done)

    def task_done(self, future):
        """Called once task is done, releases one worker."""
        self.workers.release()

【讨论】:

    猜你喜欢
    • 2014-11-17
    • 2019-11-24
    • 2017-02-08
    • 2014-07-18
    • 2016-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多