【发布时间】:2021-01-24 20:24:26
【问题描述】:
下面是一个 Python 问题,它演示了如何使用 multiprocessing.Pool 并行迭代函数 func。是 Np 要迭代的元素数。函数func 仅返回Np 减去可迭代的索引。正如所见,在并行模式下运行时,我使用队列从函数返回值。
如果我设置runParallel=False,程序可以在串行模式下执行。
对于runParallel=False 和runParallel=True,程序运行良好,但现在出现了我遇到的基本问题:如下所示,如果设置problemIndex 比Np 低一点(例如problemIndex=7) ,然后我做了一个浮点异常。我除以零 - 愚蠢的我:-)
如果运行runParallel=False,那么我可以看到错误的源代码行号并直接捕获错误。
$ python map.py
Traceback (most recent call last):
File "map.py", line 63, in <module>
a = func(argList[p])
File "map.py", line 22, in func
ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero
不错!
但是对于runParallel=True,我只是在“Bummer”打印部分结束,没有说明错误的来源。烦人!
我的问题是:对于runParallel=True,我怎样才能有效地调试它并从Pool() 中获取错误代码行的行号?
#!/usr/bin/python
# map.py
import time
import multiprocessing
import sys
import random
# Toggle whether we run parallel or not
runParallel = True
# Problematic index - if less than Np we create an exception
problemIndex = 13
# Number of compute problems
Np = 10
def func(args):
# Emulate that the function might be fast or slow
time.sleep(random.randint(1,4))
ret = args["Np"] - args["index"]
# Emulate a bug
if args["index"]==args["problemIndex"]:
ret = 1/(args["index"]-args["problemIndex"])
# Return data
if args["runParallel"]:
# We use a queue thus ordering may not be protected
args["q"].put((args["index"],ret))
else:
return ret
# Return queue used when running parallel
manager = multiprocessing.Manager()
q = manager.Queue()
# Build argument lists
argList = []
for i in range(Np):
args={}
args["index"] = i # index
args["Np"] = Np # Number of problems
args["q"] = q # return queue for parallel execution mode
args["problemIndex"] = problemIndex # if index == problemIndex then func will malfunction
args["runParallel"] = runParallel # should we run parallel
argList.append(args)
#should we run parallel
if runParallel:
# Run 10 processes in parallel
p = multiprocessing.Pool(processes=10)
ret = p.map_async(func, argList)
ret.wait()
qLen = q.qsize()
p.close()
if not qLen == Np:
print "Bummer - one of more worker threads broke down",Np,qLen
sys.exit(0)
resultVector = [None]*Np
for p in range(Np):
if runParallel:
(i,a) = q.get(timeout=0.1)
else:
i = p
a = func(argList[p])
resultVector[i] = a
for i in range(Np):
print "Index", i, "gives",resultVector[i]
【问题讨论】:
-
这里有一个关于调试MPI Python程序的相关问题stackoverflow.com/questions/46856327/…