【问题标题】:Python ThreadPool with limited task queue size任务队列大小有限的 Python ThreadPool
【发布时间】:2017-03-11 05:35:44
【问题描述】:

我的问题如下:我有一个multiprocessing.pool.ThreadPool 对象和worker_count 工作人员和一个主要的pqueue,我从中将任务提供给池。

流程如下:有一个主循环,从pqueue获取level级别的项目,并使用apply_async将其提交到池中。处理该项目时,它会生成level + 1 的项目。问题是池接受所有任务并按照提交的顺序处理它们。

更准确地说,正在发生的事情是处理 level 0 项目,每个项目生成 100 个 level 1 项目,这些项目立即从 pqueue 检索并添加到池中,每个 level 1 项目产生 100 个 level 2提交到池中的项目等,并以 BFS 方式处理项目。

我需要告诉池不接受超过 worker_count 的项目,以便有机会从 pqueue 检索更高级别的项目,以便以 DFS 方式处理项目。

我目前的解决方案是:对于每个提交的任务,将 AsyncResult 对象保存在 asyncres_list 列表中,然后在从 pqueue 检索项目之前,我删除已处理的项目(如果有),检查如果asyncres_list 的长度小于每 0.5 秒池中的线程数,这样只会同时处理thread_number 项。

我想知道是否有更简洁的方法来实现此行为,我似乎无法在文档中找到一些参数来限制可以提交到池的最大任务数。

【问题讨论】:

  • 如果您可以发布您拥有的代码的Minimal, Complete and Verifiable 示例,可能会有所帮助。处理和回答这样的问题更容易。
  • “将 BFS 更改为 DFS”问题似乎与“限制池的任务队列大小”不同

标签: python multithreading python-3.x threadpool python-multiprocessing


【解决方案1】:

ThreadPool 是用于常见任务的简单工具。如果你想自己管理队列,获得 DFS 行为;您可以直接在 threadingqueue 模块上实现必要的功能。

为了防止在当前任务产生的所有任务都完成之前安排下一个根任务(“DFS”-like order),您可以use Queue.join()

#!/usr/bin/env python3
import queue
import random
import threading
import time

def worker(q, multiplicity=5, maxlevel=3, lock=threading.Lock()):
    for task in iter(q.get, None):  # blocking get until None is received
        try:
            if len(task) < maxlevel:
                for i in range(multiplicity):
                    q.put(task + str(i))  # schedule the next level
            time.sleep(random.random())  # emulate some work
            with lock:
                print(task)
        finally:
            q.task_done()

worker_count = 2
q = queue.LifoQueue()
threads = [threading.Thread(target=worker, args=[q], daemon=True)
           for _ in range(worker_count)]
for t in threads:
    t.start()

for task in "01234":  # populate the first level
    q.put(task)
    q.join()  # block until all spawned tasks are done
for _ in threads:  # signal workers to quit
    q.put(None)
for t in threads:  # wait until workers exit
    t.join()

代码示例来源于the queue module documentation中的示例。

每个级别的任务都会产生 multiplicity 直接子任务,这些任务会产生自己的子任务,直到达到 maxlevel

None 用于向工人发出他们应该退出的信号。 t.join() 用于等待线程正常退出。如果主线程因任何原因被中断,则守护线程将被终止,除非有其他非守护线程(您可能希望提供 SIGINT 处理程序,以指示工作人员在Ctrl+C 上优雅退出,而不是仅仅死亡)。

queue.LifoQueue() 用于获得“后进先出”顺序(由于多线程,它是近似的)。

maxsize 没有设置,否则工作人员可能会死锁——无论如何你必须把任务放在某个地方。 worker_count 无论任务队列如何,后台线程都在运行。

【讨论】:

    猜你喜欢
    • 2012-06-18
    • 2014-08-12
    • 2012-01-13
    • 1970-01-01
    • 1970-01-01
    • 2017-11-14
    • 2021-11-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多