【问题标题】:Python parallel execution - how to debug efficiently?Python并行执行——如何高效调试?
【发布时间】:2021-01-24 20:24:26
【问题描述】:

下面是一个 Python 问题,它演示了如何使用 multiprocessing.Pool 并行迭代函数 func。是 Np 要迭代的元素数。函数func 仅返回Np 减去可迭代的索引。正如所见,在并行模式下运行时,我使用队列从函数返回值。

如果我设置runParallel=False,程序可以在串行模式下执行。

对于runParallel=FalserunParallel=True,程序运行良好,但现在出现了我遇到的基本问题:如下所示,如果设置problemIndexNp 低一点(例如problemIndex=7) ,然后我做了一个浮点异常。我除以零 - 愚蠢的我:-)

如果运行runParallel=False,那么我可以看到错误的源代码行号并直接捕获错误。

$ python map.py 
Traceback (most recent call last):
  File "map.py", line 63, in <module>
    a = func(argList[p])
  File "map.py", line 22, in func
    ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero

不错!

但是对于runParallel=True,我只是在“Bummer”打印部分结束,没有说明错误的来源。烦人!

我的问题是:对于runParallel=True,我怎样才能有效地调试它并从Pool() 中获取错误代码行的行号?

#!/usr/bin/python
# map.py
import time
import multiprocessing
import sys
import random

# Toggle whether we run parallel or not
runParallel = True

# Problematic index - if less than Np we create an exception
problemIndex = 13

# Number of compute problems
Np = 10

def func(args):
    # Emulate that the function might be fast or slow
    time.sleep(random.randint(1,4))
    ret  = args["Np"] - args["index"]
    # Emulate a bug 
    if args["index"]==args["problemIndex"]:
        ret = 1/(args["index"]-args["problemIndex"])
    # Return data
    if args["runParallel"]:
        # We use a queue thus ordering may not be protected
        args["q"].put((args["index"],ret))
    else:
        return ret

# Return queue used when running parallel
manager = multiprocessing.Manager()
q = manager.Queue()

# Build argument lists
argList = []
for i in range(Np):
    args={}
    args["index"] = i # index
    args["Np"] = Np   # Number of problems
    args["q"] = q     # return queue for parallel execution mode
    args["problemIndex"] = problemIndex  # if index == problemIndex then func will malfunction
    args["runParallel"] = runParallel    # should we run parallel
    argList.append(args)

#should we run parallel
if runParallel:
    # Run 10 processes in parallel
    p = multiprocessing.Pool(processes=10)
    ret = p.map_async(func, argList)
    ret.wait()
    qLen = q.qsize()
    p.close()    
    if not qLen == Np:
        print "Bummer - one of more worker threads broke down",Np,qLen
        sys.exit(0)

resultVector = [None]*Np
for p in range(Np):
    if runParallel:
        (i,a) = q.get(timeout=0.1)
    else:
        i = p
        a = func(argList[p])
    resultVector[i] = a

for i in range(Np):
    print "Index", i, "gives",resultVector[i]

【问题讨论】:

标签: python multiprocessing


【解决方案1】:

我发现 traceback 模块在多处理调试中非常有用。如果您将异常传递回主线程/进程,您将丢失所有回溯信息,因此您需要在子线程中调用 traceback.format_exc 并将该文本传递回具有异常的主线程。下面我包含一个可以与 Pool 一起使用的模式。

import traceback
import multiprocessing as mp
import time

def mpFunctionReportError(kwargs):
    '''
    wrap any function and catch any errors from f, 
    putting them in pipe instead of raising
    kwargs must contain 'queue' (multiprocessing queue) 
                    and 'f' function to be run
    '''
    queue = kwargs.pop('queue')
    f = kwargs.pop('f')
    rslt=None
    try:
        rslt = f(**kwargs)
        queue.put(rslt)
    except Exception, e:
        queue.put([e,traceback.format_exc(e)])
    return

def doNothing(a):
    return a

def raiseException(a):
    a='argh'
    raise ValueError('this is bad')


manager = mp.Manager()
outQ = manager.Queue()
p = mp.Pool(processes=4)

ret = p.map_async(mpFunctionReportError,iterable=[dict(f=doNothing,queue=outQ,a='pointless!') for i in xrange(4)])
ret.wait()
time.sleep(1)
for i in xrange(4):
    print(outQ.get_nowait())

ret = p.map_async(mpFunctionReportError,iterable=[dict(f=raiseException,queue=outQ,a='pointless!') for i in xrange(2)])
ret.wait()
time.sleep(1)
for i in xrange(2):
    e,trace = outQ.get_nowait()
    print(e)
    print(trace)

运行这个例子给出:

pointless!
pointless!
pointless!
pointless!
this is bad
Traceback (most recent call last):
  File "/home/john/projects/mpDemo.py", line 13, in mpFunctionReportError
    rslt = f(**kwargs)
  File "/home/john/projects/mpDemo.py", line 24, in raiseException
    raise ValueError('this is bad')
ValueError: this is bad

this is bad
Traceback (most recent call last):
  File "/home/john/projects/mpDemo.py", line 13, in mpFunctionReportError
    rslt = f(**kwargs)
  File "/home/john/projects/mpDemo.py", line 24, in raiseException
    raise ValueError('this is bad')
ValueError: this is bad

【讨论】:

  • 抱歉这里不了解。我喜欢这个方向。这如何适应问题?我猜至少需要更新“ret = p.map_async(func, argList)”这一行。
  • @PeterToft 抱歉,我很神秘 - 我不经常使用 Pool ,因此没有专门将我的模式应用于此。上面有一个工作示例。
【解决方案2】:

不是很优雅,但是怎么样:

def func(args):
  try:
    # Emulate that the function might be fast or slow
    time.sleep(random.randint(1,4))
    ret  = args["Np"] - args["index"]
    # Emulate a bug 
    if args["index"]==args["problemIndex"]:
      ret = 1/(args["index"]-args["problemIndex"])
    # Return data
    if args["runParallel"]:
      # We use a queue thus ordering may not be protected
      args["q"].put((args["index"],ret))
    else:
      return ret
  except Exception as e:
    logging.exception(e)
    raise

输出应如下所示(对于 problemIndex=9):

ERROR:root:integer division or modulo by zero
Traceback (most recent call last):
  File "/home/rciorba/test.py", line 26, in func
    ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero
Bummer - one of more worker threads broke down 10 9

【讨论】:

  • 对于problemIndex = 9,我只得到“无赖 - 多个工作线程之一崩溃 10 7”,即没有“真正的”改进
  • 这很奇怪,它应该打印完整的回溯。我编辑了我的答案以提供它为我产生的样本
  • 我正在使用 Python 2.7.3 进行测试
  • 我看到 Linux 上的 Python 3.2 也是如此。我无法验证您的想法,但我怀疑这种代码扩展方向可行。
  • 我明白了 - 我(显然)需要添加“导入日志记录”并进行调试,我还想添加“logging.error(args)”
【解决方案3】:

John Greenall 给出了最好的解决方案,奖金已经支付。

原因是他的解决方案没有在代码的中心部分进行尝试/例外,即 radu.ciorba 向我们展示的整个“func”。然而,这种其他方式也是可行的。

由于我的问题中 Johns 解决方案不是 100%,我将在我自己的代码中发布一个解决方案,我在其中应用了 Johns 解决方案。再次感谢 John,也感谢 Radu!

#!/usr/bin/python
# map.py solution
import time
import multiprocessing
import sys
import random
import logging
import traceback

# Toggle whether we run parallel or not
runParallel = True

# Problematic index - if less than Np we create an exception
problemIndex = 14

# Number of compute problems
Np = 10

def func(args):
    # Emulate that the function might be fast or slow
    time.sleep(random.randint(1,4))
    ret  = args["Np"] - args["index"]

    # Emulate a bug 
    if args["index"]==args["problemIndex"]:
        ret = 1/(args["index"]-args["problemIndex"])

    # Return data
    return (args["index"],ret)

def mpFunctionReportError(args):
    rslt=None
    q = args["q"]
    rslt = {"index":args["index"],
            "args":None,
            "error":None, 
            "traceback":None}
    try:
        rslt["result"] = func(args)
        q.put(rslt)
    except Exception as e:
        rslt["result"] = None
        rslt["error"] = e
        rslt["args"] = str(args)
        rslt["traceback"] = traceback.format_exc(e)
        q.put(rslt)

# Return queue used when running parallel
manager = multiprocessing.Manager()
q = manager.Queue()

# Build argument lists
argList = []
for i in range(Np):
    args={}
    args["index"] = i # index
    args["Np"] = Np   # Number of problems
    args["q"] = q     # return queue for parallel execution mode
    args["problemIndex"] = problemIndex  # if index == problemIndex then func will malfunction
    args["runParallel"] = runParallel    # should we run parallel
    argList.append(args)


resultVector = [None]*Np

#should we run parallel
if runParallel:
    # Run 10 processes in parallel
    p = multiprocessing.Pool(processes=10)
    ret = p.map_async(mpFunctionReportError, argList)
    # Wait until error or done
    ret.wait()
    # Queue size
    qLen = q.qsize()
    p.close()    
    # List for the errors
    bugList = {}
    # Loop the queue
    for i in range(qLen):
        # Pop a value
        returnVal = q.get()
        # Check for the error code
        if returnVal["error"] is not None:
            bugList[returnVal["index"]] = returnVal
        else:
            resultVector[returnVal["index"]] = returnVal["result"]

    # Print the list of errors
    if bugList:        
        print "-"*70
        print "Some parts of the parallel execution broke down. Error list:"
        print "-"*70
        for i in bugList:
            print "Index :",bugList[i]["index"]
            print "Error code :",bugList[i]["error"]
            print "Traceback :",bugList[i]["traceback"]
            print "Args :",bugList[i]["args"]
            print "-"*70
        sys.exit(0)
else:
    for p in range(Np):
        resultVector[i] = func(argList[p])

for i in range(Np):
    print "Index", i, "gives",resultVector[i]

当它因“runParallel = True”和“problemIndex = 4”而中断时,我们现在拥有完整的跟踪信息

----------------------------------------------------------------------
Some parts of the parallel execution broke down. Error list:
----------------------------------------------------------------------
Index : 4
Error code : integer division or modulo by zero
Traceback : Traceback (most recent call last):
  File "fix3.py", line 44, in mpFunctionReportError
    rslt["result"] = func(args)
  File "fix3.py", line 26, in func
    ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero

Args : {'Np': 10, 'index': 4, 'problemIndex': 4, 'q': <AutoProxy[Queue] object, typeid 'Queue' at 0xb708710c>, 'runParallel': True}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-04-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-13
    • 1970-01-01
    • 2021-10-10
    • 2015-12-29
    • 1970-01-01
    相关资源
    最近更新 更多