【发布时间】:2017-05-14 12:27:53
【问题描述】:
我正在使用来自 RabbitMQ 通道的消息,我希望我可以一次使用 n 个元素。我想我可以使用 ProcessPoolExecutor(或 ThreadPoolExecutor)。 我只是想知道是否有可能知道池中是否有免费的执行者。
这就是我要写的:
executor = futures.ProcessPoolExecutor(max_workers=5)
running = []
def consume(message):
print "actually consuming a single message"
def on_message(channel, method_frame, header_frame, message):
# this method is called once per incoming message
future = executor.submit(consume, message)
block_until_a_free_worker(executor, future)
def block_until_a_free_worker(executor, future):
running.append(future) # this grows forever!
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
[...]
channel.basic_consume(on_message, 'my_queue')
channel.start_consuming()
我需要编写函数block_until_a_free_worker。 这种方法应该能够检查是否所有正在运行的工作人员都在使用中。
如果可用,我可以使用任何阻塞的 executor.submit 选项。
我尝试了一种不同的方法,并在它们完成的同时更改了期货列表。 我尝试从列表中显式添加和删除期货,然后像这样等待:
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
这似乎不是一个解决方案。
我可以设置一个future.add_done_callback,并可能计算正在运行的实例...
任何提示或想法? 谢谢。
【问题讨论】:
-
可能是一个基于 multiprocessing.Pool 和 Semaphore 的解决方案,使用工人数量进行初始化:stackoverflow.com/questions/9601802/…
标签: python rabbitmq multiprocessing pika concurrent.futures