【问题标题】:Python get the active process number of multiprocessing poolPython获取多处理池的活动进程号
【发布时间】:2017-10-01 16:10:09
【问题描述】:

我创建了一个带有多处理池的进程池。我有很多任务要处理,但要获得任务的qps并不容易。所以我想获取池的活动进程号,以便我可以设置适当的池大小。这是整个代码:

import time
from multiprocessing import Pool

def do_work(msg):
    # do some work


if __name__ == '__main__':
    consumer = KafkaConsumer(
    group_id=worker_config.kafka_group_id,
    bootstrap_servers=kafka_url,
    auto_offset_reset=worker_config.kafka_reset,
    enable_auto_commit=True)
    consumer.subscribe(topics=worker_config.kafka_topics)

    for message in consumer:
        logging.info('topic=%s, partition=%d, msg=%s' % (message.topic, message.partition, msg))
        pool.apply_async(do_work, (message,))
        process_count = number_of_active_process_of_pool
        logging.info("number_of_active_process_number is %d", process_count)


    pool.close()
    pool.join()

【问题讨论】:

    标签: python multiprocessing pool


    【解决方案1】:

    apply_async 给你一个 AsyncResult: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult

    您可以使用.ready() 来确定它是否已完成。通过这种方式,您可以完成任务的数量,并扩展剩余的任务数量。 只要这个数字超过了poolsize,就可以假设poolsize有很多进程正在运行,如果没有,那么剩余的任务量就是正在运行的进程数。

    替代方案:

    如果您不使用 apply_async 而是使用队列,例如 this one,则可以使用 .qsize() 获得大致的队列大小

    还有multiprocessing.active_children,但只有在这些进程结束时才有效,但池不会;除非您将其订购至.join() 所以在你的情况下它会起作用。

    【讨论】:

    • 感谢您的回答。下面列出了整个代码。我收到来自 Kafka 的消息。池大小为8。当消息数大于8时,进程数仍为8,消息存储在进程池中。但是当消息数小于8例如6时,池中的活动进程数为6。我只想知道池中的活动进程数为6。因为当我从kafka收到消息时,我会把它扔到池子里,所以我不在乎这个过程的结果。
    • 因为我不知道所有消息的数量,所以使用.ready()get the amount of tasks done and by extension the amount of tasks left to be done 不适合我的情况。
    • 每当您通过 apply_async 添加任务时,都会将返回的任务对象放入列表中。每当您需要剩余的任务量时,您都会浏览列表并取出所有 ready() 报告完成的结果。然后,您可以 len(thelist) 并获取仍然需要完成的剩余任务。
    猜你喜欢
    • 2013-12-01
    • 1970-01-01
    • 2022-01-08
    • 1970-01-01
    • 1970-01-01
    • 2022-11-30
    • 2015-01-13
    • 1970-01-01
    • 2021-07-24
    相关资源
    最近更新 更多