【问题标题】:Multiprocessing Robust to Occasional Failures多处理对偶发故障的鲁棒性
【发布时间】:2018-10-12 17:21:39
【问题描述】:

我有一个 100-1000 个时间序列路径和一个相当昂贵的模拟,我想并行化它们。但是,我使用的库在极少数情况下会挂起,我想让它对这些问题具有鲁棒性。这是当前设置:

with Pool() as pool:
    res = pool.map_async(simulation_that_occasionally_hangs, (p for p in paths))
    all_costs = res.get()

我知道 get() 有一个 timeout 参数,但如果我理解正确,它适用于 1000 条路径的整个过程。我想做的是检查是否有任何 single 模拟耗时超过 5 分钟(正常路径需要 4 秒),如果是,则停止该路径并继续 get() 其余部分。

编辑:

pebble 中的测试超时

def fibonacci(n):
    if n == 0: return 0
    elif n == 1: return 1
    else: return fibonacci(n - 1) + fibonacci(n - 2)


def main():
    with ProcessPool() as pool:
        future = pool.map(fibonacci, range(40), timeout=10)
        iterator = future.result()

        all = []
        while True:
            try:
                all.append(next(iterator))
            except StopIteration:
                break
            except TimeoutError as e:
                print(f'function took longer than {e.args[1]} seconds')

        print(all)

错误:

RuntimeError: I/O operations still in flight while destroying Overlapped object, the process may crash
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\anaconda3\lib\multiprocessing\spawn.py", line 99, in spawn_main
    new_handle = reduction.steal_handle(parent_pid, pipe_handle)
  File "C:\anaconda3\lib\multiprocessing\reduction.py", line 87, in steal_handle
    _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
PermissionError: [WinError 5] Access is denied

【问题讨论】:

    标签: python multiprocessing pool robustness


    【解决方案1】:

    pebble 库旨在解决此类问题。它透明地处理作业超时和故障,例如 C 库崩溃。

    您可以查看documentation 示例以了解如何使用它。它具有与concurrent.futures 类似的界面。

    【讨论】:

    • 看起来像答案,但即使有他们的文档(例如使用他们的斐波那契示例),我也无法将他们的超时与将所有成功的运行拉入类似n = list(future.result()) 的东西结合起来。关于如何修改斐波那契示例以获取结果列表但没有“future.cancel()”的任何建议?
    • 使用第一个例子。在那里它显示了如何在记录失败时提取所有成功的结果。
    • 我根据上面的第一个示例添加了代码。如果您的计算机速度更快,您可能需要做的不仅仅是range(40)。两个问题。 all 只保存最终值,我得到了 RuntimeError: I/O operations still in flight while destroying Overlapped object
    • ok 修复了它以获取 all 中的所有值,但我仍然收到运行时错误和 PermissionError。我想我会在 SE 中提出另一个问题。
    • 如果没有追溯信息,就无能为力。
    【解决方案2】:

    可能最简单的方法是在单独的子进程中运行每个繁重的模拟,并由父进程监视它。具体来说:

    def risky_simulation(path):
        ...
    
    def safe_simulation(path):
        p = multiprocessing.Process(target=risky_simulation, args=(path,))
        p.start()
        p.join(timeout)  # Your timeout here
        p.kill()  # or p.terminate()
        # Here read and return the output of the simulation
        # Can be from a file, or using some communication object
        # between processes, from the `multiprocessing` module
    
    with Pool() as pool:
        res = pool.map_async(safe_simulation, paths)
        all_costs = res.get()
    

    注意事项:

    1. 如果模拟可能挂起,您可能希望在单独的进程中运行它(即 Process 对象不应是线程),因为它可能会捕获 GIL。
    2. 此解决方案仅将池用于直接子进程,但计算被卸载到新进程。我们还可以确保计算共享一个池,但这会导致代码更丑陋,所以我跳过了它。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-03-26
      • 2012-04-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-07-19
      • 2016-08-30
      • 1970-01-01
      相关资源
      最近更新 更多