【问题标题】:Generator function causing exceptions to be caught after all processes complete生成器函数导致在所有进程完成后捕获异常
【发布时间】:2019-12-09 16:40:21
【问题描述】:

我写了这个简短的 POC 来帮助理解我遇到的问题,希望有人可以向我解释发生了什么以及如何解决它和/或提高它的效率。

我使用迭代器、itertools 和生成器的目标是因为我不想在内存中存储一​​个巨大的列表,因为我扩大列表将变得难以管理,并且我不想遍历整个列表每次都做某事。请注意,我对生成器、迭代器和多处理的概念相当陌生,并且今天编写了这段代码,所以,如果你能清楚地告诉我想念理解这些东西应该如何工作的工作流程,请教育我并帮助我代码更好。

您应该能够按原样运行代码并查看我面临的问题。我期待一旦捕获到异常,它就会被引发并且脚本终止,但是我看到正在发生的事情,异常被捕获但其他进程继续。

如果我注释掉generateRange 生成器并创建一个虚拟列表并将其传递给futures = (map(executor.submit, itertools.repeat(execute), mylist)),则异常会被捕获并按预期退出脚本。

我的猜测是,生成器/迭代器必须在脚本终止之前完成生成范围,据我了解,情况并非如此。

我选择使用生成器函数/迭代器的原因是您只能在需要时访问它们。

有没有办法让我停止生成器继续并让异常适当地引发。

这是我的 POC:

import concurrent.futures

PRIMES = [0]*80

import time

def is_prime(n):
    print("Enter")
    time.sleep(5)
    print("End")
    1/0

child = []
def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
        for i in PRIMES:
            child.append(executor.submit(is_prime, i))
        for future in concurrent.futures.as_completed(child):
            if future.exception() is not None:
                print("Throw an exception")
                raise future.exception()

if __name__ == '__main__':
    main()

编辑:我用更简单的东西更新了 POC。

【问题讨论】:

  • 您要并行化的东西主要受 CPU 速度或 I/O 限制吗?如果是后者,多处理不是一个好主意。
  • 线程很难正确处理;-(
  • 如果真的是素数检测,则必须使用多处理。如果是I/O,我会推荐trio;它是极少数能够正确处理异常的并发解决方案之一。
  • 这只是一个 POC,不是我实际在做的@L3viathan
  • 真的不可能像这样取消正在运行的进程。您可以做的是遍历所有包含期货的child.cancel(),但无论如何,一些进程仍将执行。虽然不是完整的 80。

标签: python-3.x iterator multiprocessing generator itertools


【解决方案1】:

不可能立即取消正在运行的期货,但这至少使得在引发异常后只有少数进程运行:

import concurrent.futures                                                  

PRIMES = [0]*80                                                            

import time                                                                

def is_prime(n):                                                           
    print("Enter")                                                         
    time.sleep(5)                                                          
    print("End")                                                           
    1/0                                                                    

child = []                                                                 
def main():                                                                
    with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
        for i in PRIMES:                                                   
            child.append(executor.submit(is_prime, i))                     
        for future in concurrent.futures.as_completed(child):              
            if future.exception() is not None:                             
                for fut in child:                                          
                    fut.cancel()                                           
                print("Throw an exception")                                
                raise future.exception()                                   

if __name__ == '__main__':                                                 
    main()                                                                 

【讨论】:

  • 评论是关键,虽然我以前有过,但在这里似乎可以工作,很好的接触,谢谢。
猜你喜欢
  • 2020-03-11
  • 1970-01-01
  • 2014-01-27
  • 2018-06-19
  • 2016-06-16
  • 2010-09-12
  • 1970-01-01
  • 1970-01-01
  • 2015-12-14
相关资源
最近更新 更多