【问题标题】:python multiprocessing pool retriespython多处理池重试
【发布时间】:2012-07-24 19:06:48
【问题描述】:

如果原始计算失败,有没有办法使用简单的池重新发送一条数据进行处理?

import random
from multiprocessing import Pool

def f(x):
   if random.getrandbits(1):
       raise ValueError("Retry this computation")
   return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])

【问题讨论】:

  • 也许您想return f(x) 而不是提出ValueError?只是猜测......
  • 在您的实际应用中失败的几率有多高?也就是说,与等待其他进程先完成相比,该进程立即重试有多重要?
  • 这是一个中等失败的机会,不需要立即重试(但最终应该并行重试)。

标签: python multiprocessing


【解决方案1】:

如果您可以(或不介意)立即重试,请使用包装函数的装饰器:

import random
from multiprocessing import Pool
from functools import wraps

def retry(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        while True:
            try:
                return f(*args, **kwargs)
            except ValueError:
                pass
    return wrapped

@retry
def f(x):
    if random.getrandbits(1):
        raise ValueError("Retry this computation")
    return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])

【讨论】:

    【解决方案2】:

    您可以使用Queue 通过启动Process 中的循环将失败反馈到Pool

    import multiprocessing as mp
    import random
    
    def f(x):
        if random.getrandbits(1):
            # on failure / exception catch
            f.q.put(x)
            return None
        return x*x
    
    def f_init(q):
        f.q = q
    
    def main(pending):
        total_items = len(pending)
        successful = []
        failure_tracker = []
    
        q = mp.Queue()
        p = mp.Pool(None, f_init, [q])
        results = p.imap(f, pending)
        retry_results = []
        while len(successful) < total_items:
            successful.extend([r for r in results if not r is None])
            successful.extend([r for r in retry_results if not r is None])
            failed_items = []
            while not q.empty():
                failed_items.append(q.get())
            if failed_items:
                failure_tracker.append(failed_items)
                retry_results = p.imap(f, failed_items);
        p.close()
        p.join()
    
        print "Results: %s" % successful
        print "Failures: %s" % failure_tracker
    
    if __name__ == '__main__':
        main(range(1, 10))
    

    输出是这样的:

    Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
    Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]
    

    Pool 不能在多个进程之间共享。因此,这种基于Queue 的方法。如果您尝试将池作为参数传递给池进程,则会收到此错误:

    NotImplementedError: pool objects cannot be passed between processes or pickled
    

    您也可以在您的函数f 中尝试立即重试几次,以避免同步开销。这实际上是您的函数应该等待多长时间重试,以及如果立即重试成功的可能性有多大。


    旧答案: 为了完整起见,这是我的旧答案,它不如直接重新提交到池中最佳,但可能仍然取决于用例,因为它提供了一种自然的方式来处理/限制n-level 重试:

    您可以使用Queue 汇总失败并在每次运行结束时重新提交,在多次运行中:

    import multiprocessing as mp
    import random
    
    
    def f(x):
        if random.getrandbits(1):
            # on failure / exception catch
            f.q.put(x)
            return None
        return x*x
    
    def f_init(q):
        f.q = q
    
    def main(pending):
        run_number = 1
        while pending:
            jobs = pending
            pending = []
    
            q = mp.Queue()
            p = mp.Pool(None, f_init, [q])
            results = p.imap(f, jobs)
            p.close()
    
            p.join()
            failed_items = []
            while not q.empty():
                failed_items.append(q.get())
            successful = [r for r in results if not r is None]
            print "(%d) Succeeded: %s" % (run_number, successful)
            print "(%d) Failed:    %s" % (run_number, failed_items)
            print
            pending = failed_items
            run_number += 1
    
    if __name__ == '__main__':
        main(range(1, 10))
    

    输出如下:

    (1) Succeeded: [9, 16, 36, 81]
    (1) Failed:    [2, 1, 5, 7, 8]
    
    (2) Succeeded: [64]
    (2) Failed:    [2, 1, 5, 7]
    
    (3) Succeeded: [1, 25]
    (3) Failed:    [2, 7]
    
    (4) Succeeded: [49]
    (4) Failed:    [2]
    
    (5) Succeeded: [4]
    (5) Failed:    []
    

    【讨论】:

    • 将我的答案更新为不需要多次运行的答案,现在可以在同一个原始池上工作。
    • 感谢您的详细回复。我喜欢将失败的计算放入队列中重试的想法。我必须奖励 Andrew 赏金,因为他的解决方案做了一个简单的重试。
    • @ash 我在回复中确实提到了立即重试,认为这将是一个微不足道/简单的添加,而不是您想要的。另请注意,它(立即重试)并非对所有情况都是最佳的,尤其是那些立即重试成功机会低的情况(在这种情况下,它非常次优,因为它会导致可能成功的工作资源匮乏。)恭喜安德鲁无论如何。
    猜你喜欢
    • 2021-07-24
    • 2020-08-14
    • 2016-11-10
    • 2017-04-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-08-25
    • 2016-12-07
    相关资源
    最近更新 更多