【问题标题】:Python: Wait on all of `concurrent.futures.ThreadPoolExecutor`'s futuresPython:等待所有`concurrent.futures.ThreadPoolExecutor`的期货
【发布时间】:2014-02-04 06:15:34
【问题描述】:

我已经给concurrent.futures.ThreadPoolExecutor 分配了一堆任务,我想等到它们都完成后再继续流程。我怎样才能做到这一点,而不必保存所有期货并在它们上调用wait? (我想对执行者采取行动。)

【问题讨论】:

  • 执行者不知道何时执行完毕,即Future对象的域。你能进一步解释为什么你不想使用Future 方法吗?有许多不同的方法可以做到这一点(其中一种是wait,正如你所指出的)。
  • @roippi 实际上确实如此。看我的回答。然而,这可能不是 OP 所期望的。

标签: python concurrency future


【解决方案1】:

只需拨打Executor.shutdown

shutdown(wait=True)

通知执行者它应该释放它所拥有的任何资源 当当前挂起的期货完成执行时使用。来电 到Executor.submit()Executor.map() 关机后将 提高RuntimeError.

如果等待是True,那么这个方法将不会返回,直到所有未决的期货都完成 执行完毕,与执行器相关的资源已被释放。

但是,如果您在列表中跟踪您的未来,那么您可以避免使用 futures.wait() 函数关闭执行程序以备将来使用:

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待Future 实例(可能由不同的 Executor 实例)由fs 完成。返回一个命名的 2 元组 套。第一个集合,名为 done,包含期货 在等待完成之前完成(完成或被取消)。这 第二组名为 not_done,包含未完成的期货。

请注意,如果您不提供timeout,它会等到所有期货都完成。

您也可以改用futures.as_completed(),但是您必须对其进行迭代。

【讨论】:

  • 所以你的意思是,如果我使用执行器作为上下文管理器,那么在套件完成后,它会等到所有期货都完成,即使有 10,000 个并且只有 10 个工人?
  • @RamRachum 是的。从文档中:“如果您使用 with 语句,您可以避免显式调用此方法,这将关闭 Executor(等待就像调用 Executor.shutdown() 并设置为 True):”。跨度>
  • shutdown(wait=True) 是否也等待未来的回调?
  • 如果你使用with语句,那么shutdown会被自动调用。
  • @ospider 根据我的经验,执行器通常不用作上下文管理器,因为它用于代码的不同部分。
【解决方案2】:

Bakuriu 的回答是正确的。只是为了扩展一点。众所周知,上下文管理器具有__enter____exit__ 方法。 class Executor(ThreadPoolExecutor 的基类) 是这样定义的

class Executor(object):

    # other methods

    def shutdown(self, wait=True):
        """Clean-up the resources associated with the Executor.

        It is safe to call this method several times. Otherwise, no other
        methods can be called after this one.

        Args:
            wait: If True then shutdown will not return until all running
                futures have finished executing and the resources used by the
                executor have been reclaimed.
        """
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False

真正定义shutdown方法的是ThreadPoolExecutor

class ThreadPoolExecutor(_base.Executor):
    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()

【讨论】:

  • 顾名思义,这会产生完成后的结果,即它不会按照 OP 的要求等待整个任务池完成。
  • @roippi 如果您知道有多少任务,为什么不计算已完成任务的数量呢?但话虽如此,不能保证程序知道并发任务的数量。
  • @roippi 完成和完成有什么区别?
【解决方案3】:

如前所述,可以使用Executor.shutdown(wait=True),但还要注意文档中的以下注释:

如果您使用with 语句,您可以避免显式调用此方法,这将关闭Executor(等待Executor.shutdown()@987654326 一起调用@ 设置为True):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

【讨论】:

    猜你喜欢
    • 2020-10-31
    • 1970-01-01
    • 2015-06-03
    • 2023-03-16
    • 1970-01-01
    • 2019-11-14
    • 1970-01-01
    • 2019-10-29
    • 2014-06-17
    相关资源
    最近更新 更多