【问题标题】:Python multiprocessing queue size keeps growingPython多处理队列大小不断增长
【发布时间】:2022-12-07 22:15:09
【问题描述】:

我已经使用 concurrent.futures.ProcessPoolExecutor 实现了一个进程池,但我注意到当我打印出 pool._queue_count 时,每次我向池中提交一个新的工作项目时它都会不断增长。为什么要这样做,这会成为一个问题吗?

这是我当前正在记录的输出:

2022-12-06 15:37:31,934 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:31,934 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:31,935 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:31,935 - DEBUG | Pool queue size: 329
2022-12-06 15:37:31,935 - DEBUG | Pool processes: {19113: <ForkProcess name='ForkProcess-2' pid=19113 parent=19104 started>, 19114: <ForkProcess name='ForkProcess-3' pid=19114 parent=19104 started>}
2022-12-06 15:37:31,935 - DEBUG | Pool pending work: {328: <concurrent.futures.process._WorkItem object at 0x7f247f7be2e0>}
2022-12-06 15:37:41,946 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:41,946 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:41,946 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:41,947 - DEBUG | Pool queue size: 330
2022-12-06 15:37:41,947 - DEBUG | Pool processes: {19113: <ForkProcess name='ForkProcess-2' pid=19113 parent=19104 started>, 19114: <ForkProcess name='ForkProcess-3' pid=19114 parent=19104 started>}
2022-12-06 15:37:41,947 - DEBUG | Pool pending work: {329: <concurrent.futures.process._WorkItem object at 0x7f247f7be6a0>}

请注意,池队列大小现在报告 330 - 但我不明白这意味着什么或为什么这么高。由于某种原因,它每次都会将大小增加一个。

我无法粘贴所有代码,因为有很多代码,但这里有一个稍微浓缩的版本,一些我认为不相关的代码 sn-ps 被删掉了:

futures = []
with mp.Manager() as manager:
    last_execution = time.perf_counter()
    pool = ProcessPoolExecutor()
    while True:
        current_time = time.perf_counter()
        if current_time - last_execution < 10 and not first_run:
            time.sleep(1)
        else:
            last_execution = current_time
            for automation_file in automation_files:
                with open(automation_file, "r") as f:
                    automation_config = json.load(f)
                automation_name = os.path.splitext(os.path.basename(automation_file))[0]
                automation_log = os.path.join(log_dir, f"{automation_name}.log")
                automation_type = automation_config["type"]
                if automation_type == "task":
                    automation = pyba.AutomationTask(automation_name, automation_config, automation_log, api_1, api_2)
                else:
                    logger.error(f"Unknown automation type in '{os.path.basename(automation_file)}', skipping")
                    continue
                logger.debug(f"Running automation '{automation.name}' with internal automation id '{automation._id}'")
            future = pool.submit(automation.run, args=(session_1, session_2, stop_app_event))
            futures.append(future)
            logger.debug(f"Pool queue size: {pool._queue_count}")
            logger.debug(f"Pool processes: {pool._processes}")
            logger.debug(f"Pool pending work: {pool._pending_work_items}")

基本上,我们得到一堆自动化文件,解析它们,然后使用进程池在新进程中运行它们。然后我们等待一个给定的时间间隔(在这里测试 10 秒),然后再次执行完全相同的操作。

但是,现在这些自动化过程没有任何东西可以实际处理,因为我正在测试并且没有为它创建任何测试记录......所以我不明白队列大小如何随着时间的推移变得如此之大。

我的测试服务器上的 CPU 数量是 2 - 所以池中应该只有两个进程?

我认为内存或 CPU 在这里不是问题:

-bash-4.2$ ps aux | head -1; ps aux | grep -iE 'python3.9|19104' | grep -v grep | sort -rnk 4
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
admin    19114  0.0  0.0 225584 15648 pts/1    S+   14:42   0:00 python3.9 app.py
admin    19113  0.0  0.0 225584 15612 pts/1    S+   14:42   0:00 python3.9 app.py
admin    19107  0.0  0.0 520492 15376 pts/1    Sl+  14:42   0:01 python3.9 app.py
admin    19104  0.0  0.0 374080 20248 pts/1    Sl+  14:42   0:02 python3.9 app.py

最后还要提到的是,我已经使用信号实现了一个优雅的停止解决方案。当我向应用程序发送停止信号时,它几乎立即优雅地停止——这表明尽管队列数量如此之大,但它没有进行任何处理。哪一种真正增加了混乱:

2022-12-06 16:16:05,505 - DEBUG | Pool queue size: 560
2022-12-06 16:16:05,506 - DEBUG | Pool processes: {19113: <ForkProcess name='ForkProcess-2' pid=19113 parent=19104 started>, 19114: <ForkProcess name='ForkProcess-3' pid=19114 parent=19104 started>}
2022-12-06 16:16:05,506 - DEBUG | Pool pending work: {559: <concurrent.futures.process._WorkItem object at 0x7f247f738160>}
2022-12-06 16:16:12,516 - DEBUG | Received a signal to stop the app, setting the stop flag
2022-12-06 16:16:12,516 - DEBUG | Cancelling all scheduled pending work
2022-12-06 16:16:12,518 - DEBUG | Shutting down the process pool
2022-12-06 16:16:12,522 - DEBUG | Process pool shut down successfully, app stopped

【问题讨论】:

    标签: python python-3.x multiprocessing python-multiprocessing


    【解决方案1】:

    _queue_countis just a sequential work item ID and it will never decrease

    无论如何你都不应该手动阅读它(这就是它名字中前缀下划线的意思!)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-01-22
      • 2018-07-11
      • 2011-08-19
      • 2015-02-17
      • 2016-04-06
      • 1970-01-01
      • 2018-06-30
      • 2012-07-11
      相关资源
      最近更新 更多