【问题标题】:concurrent.futures.ThreadPoolExecutor.map(): timeout not workingconcurrent.futures.ThreadPoolExecutor.map():超时不起作用
【发布时间】:2018-12-27 11:31:14
【问题描述】:
import concurrent.futures
import time 

def process_one(i):
    try:                                                                             
        print("dealing with {}".format(i))                                           
        time.sleep(50)
        print("{} Done.".format(i))                                                  
    except Exception as e:                                                           
        print(e)

def process_many():
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: 
        executor.map(process_one,
                range(100),                                                          
                timeout=3)                                                           


if __name__ == '__main__':                                                           
    MAX_WORKERS = 10
    try:
        process_many()
    except Exception as e:                                                           
        print(e)      

docs 说:

如果调用 __next__() 并且在从最初调用 Executor.map() 开始的 timeout 秒后结果不可用,则返回的迭代器会引发 concurrent.futures.TimeoutError

但是这里的脚本没有引发任何异常并一直在等待。有什么建议吗?

【问题讨论】:

  • 您是要终止挂起的作业,还是希望整个 process_many 调用花费大约 3 秒或更短的时间?
  • @arachnivore 杀死挂起的作业并释放它们占用的线程。
  • 哪个python版本?

标签: python concurrency concurrent.futures


【解决方案1】:

正如我们在source(对于python 3.7)中看到的那样,映射返回一个函数:

def map(self, fn, *iterables, timeout=None, chunksize=1):
    ...
    if timeout is not None:
        end_time = timeout + time.time()
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
    # Yield must be hidden in closure so that the futures are submitted
    # before the first iterator value is required.
    def result_iterator():
        try:
            # reverse to keep finishing order
            fs.reverse()
            while fs:
                # Careful not to keep a reference to the popped future
                if timeout is None:
                    yield fs.pop().result()
                else:
                    yield fs.pop().result(end_time - time.time())
        finally:
            for future in fs:
                future.cancel()
    return result_iterator()

TimeoutError 是从 yield fs.pop().result(end_time - time.time()) 调用引发的,但您必须请求结果才能到达该调用。

理由是您不关心提交任务。任务被提交并开始在后台线程中运行。您关心的是请求结果时的超时 - 这是一个常见的用例,您提交任务并在有限的时间内向他们请求结果,而不仅仅是提交它们并期望它们在有限的时间内终止。

如果你是后者,你可以使用wait,例如Individual timeouts for concurrent.futures中所示

【讨论】:

  • 感谢您指出方向。我发现以下答案是迄今为止最好的解决方案:stackoverflow.com/a/44719580/1405762
  • 就我而言,有一个巨大的 url 池,我想尽可能多地对它们进行采样(获取每个页面的内容),但不介意放弃缓慢的连接和尝试下一个。
【解决方案2】:

正如文档指定的那样,只有在地图上调用 __next__() 方法时才会引发超时错误。要调用此方法,您可以将输出转换为列表:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n / 10


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    print('main: starting')
    try:
        # without this conversion to a list, the timeout error is not raised
        real_results = list(results) 
    except futures._base.TimeoutError:
        print("TIMEOUT")

输出:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-9_0: done with 1
ThreadPoolExecutor-9_1: done with 2
TIMEOUT
ThreadPoolExecutor-9_2: done with 3
ThreadPoolExecutor-9_3: done with 4
ThreadPoolExecutor-9_4: done with 5

这里,第n个任务休眠n秒,所以在任务2完成后超时。


编辑:如果您想终止未完成的任务,您可以尝试this 问题中的答案(虽然他们不使用ThreadPoolExecutor.map()),或者您可以忽略其他任务的返回值,让它们完成:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    outputs = []
    try:
        for i in results:
            outputs.append(i)
    except futures._base.TimeoutError:
        print("TIMEOUT")
    print(outputs)

输出:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-5_0: done with 1
ThreadPoolExecutor-5_1: done with 2
TIMEOUT
[1, 2]
ThreadPoolExecutor-5_2: done with 3
ThreadPoolExecutor-5_3: done with 4
ThreadPoolExecutor-5_4: done with 5

【讨论】:

  • task(n) 将始终被执行(打印“done with n”)。在 TimeoutException 的情况下有什么方法可以中断它?我也尝试了生成器的方式,明确调用 __next__();结果相同。
  • @HaoWang 我已经编辑了我的答案来解决这个问题。但是,我刚刚意识到第二种解决方案仅在您的任务按时间顺序排列时才有效,即。如果之后的任务有更高的延迟 - 这使得它非常不切实际。我会试着找点别的。
猜你喜欢
  • 2017-08-07
  • 2012-05-15
  • 2011-02-21
  • 1970-01-01
  • 2011-09-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多