【问题标题】:store results ThreadPoolExecutor存储结果 ThreadPoolExecutor
【发布时间】:2019-02-04 13:13:19
【问题描述】:

我对“concurrent.futures”的并行处理相当陌生,我正在测试一些简单的实验。我编写的代码似乎可以工作,但我不确定如何存储结果。我试图创建一个列表(“期货”)并将结果附加到该列表中,但这大大减慢了过程。我想知道是否有更好的方法来做到这一点。谢谢。

import concurrent.futures
import time

couple_ods= []
futures=[]

dtab={}
for i in range(100):
    for j in range(100):
       dtab[i,j]=i+j/2
       couple_ods.append((i,j))

avg_speed=100
def task(i):
    origin=i[0]
    destination=i[1]
    time.sleep(0.01)
    distance=dtab[origin,destination]/avg_speed
    return distance
start1=time.time()
def main():
    with concurrent.futures.ThreadPoolExecutor() as executor:
       for number in couple_ods:
          future=executor.submit(task,number)
          futures.append(future.result())

if __name__ == '__main__':
    main()
end1=time.time()

【问题讨论】:

    标签: python python-multithreading concurrent.futures


    【解决方案1】:

    当您调用 future.result() 时,它会阻塞,直到值准备好。因此,您不会从这里的并行性中获得任何好处——您开始一项任务,等待它完成,开始另一项任务,等待它完成,等等。

    当然,您的示例首先不会从线程中受益。你的任务除了 CPU 绑定的 Python 计算什么都不做,这意味着(至少在 CPython、MicroPython 和 PyPy,它们是 concurrent.futures 附带的唯一完整实现),GIL(全局解释器锁)将阻止更多比你的一个线程一次进步。

    希望您的真实程序有所不同。如果它正在做 I/O 绑定的事情(发出网络请求、读取文件等),或者使用像 NumPy 这样的扩展库来释放 GIL 以解决繁重的 CPU 工作,那么它会正常工作。但除此之外,您需要在此处使用 ProcessPoolExecutor


    无论如何,您要做的就是将future 本身附加到一个列表中,这样您就可以在等待它们之前获得所有期货的列表:

    for number in couple_ods:
        future=executor.submit(task,number)
        futures.append(future)
         
    

    然后,在您开始所有作业后,您就可以开始等待它们了。有三个简单的选项,当您需要更多控制时,还有一个复杂的选项。


    (1) 您可以直接循环它们以按提交顺序等待它们:

    for future in futures:
        result = future.result()
        dostuff(result)
    

    (2) 如果您需要等待它们全部完成后再进行任何工作,您可以拨打wait

    futures, _ = concurrent.futures.wait(futures)
    for future in futures:
        result = future.result()
        dostuff(result)
    

    (3) 如果您想在每个准备就绪后立即处理,即使它们出现故障,请使用as_completed

    for future in concurrent.futures.as_completed(futures): 
        dostuff(future.result())
    

    请注意,文档中使用此函数的示例提供了一些方法来确定完成了哪个任务。如果你需要,它可以很简单,只需将每个索引传递一个索引,然后 return index, real_result,然后你可以 for index, result in … 进行循环。

    (4) 如果您需要更多控制权,您可以循环访问waiting,了解目前为止所做的任何事情:

    while futures:
        done, futures = concurrent.futures.wait(concurrent.futures.FIRST_COMPLETED)
        for future in done:
            result = future.result()
            dostuff(result)
    

    该示例与as_completed 执行相同的操作,但您可以对其编写细微的变化来执行不同的操作,例如等待所有内容完成但如果有任何引发异常则提前取消。


    对于很多简单的情况,你可以只使用执行器的map方法来简化第一个选项。这就像内置的 map 函数一样,为参数中的每个值调用一次函数,然后为您提供一些可以循环以相同顺序获取结果的内容,但它是并行执行的。所以:

    for result in executor.map(task, couple_ods):
        dostuff(result)
    

    【讨论】:

    • 我认为答案的as_completed部分有一个小错误,应该是:for future in concurrent.futures.as_completed(futures): dostuff(future.result())
    猜你喜欢
    • 2013-01-06
    • 2018-08-17
    • 2012-06-22
    • 1970-01-01
    • 1970-01-01
    • 2016-01-23
    • 2022-11-25
    • 2017-11-04
    • 1970-01-01
    相关资源
    最近更新 更多