首先,您的第一个错误是没有从您的func() 函数返回任何内容。在 python 中,所有值都是对内存中对象的引用,当您进行赋值时,您将用新对象替换引用的值。所以做M = -M并不是改变M的对象,而是创建一个新的,在函数范围内改变引用M。
这意味着你的函数将总是返回None:
>>> from multiprocessing import Pool
>>> def func(M): # if you call with M=1
... # M==1 here
... M = -M
... # M==-1 here
...
>>> M = 1
>>> M = func(M)
>>> print(M)
None
要解决此问题,您需要将其 返回 值:
>>> def func(M):
... return -M
...
>>> print(func(1))
-1
然后,并行化工作的最佳方法是使用进程池,以便您可以控制并行运行的实例数量,直接适应 from the documentation examples:
>>> def func(M):
>>> return -M
...
>>> pool = Pool(processes=4) # start 4 worker processes
>>> results = []
>>> for i in range(1,100):
... results.append(pool.apply_async(func, [i])) # evaluate "func(i)" asynchronously
...
>>> print [result.get() for result in results]
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]
做同样事情的另一种方法是使用:
>>> print(pool.map(func, range(1,100)))
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]
话虽如此,如果您所做的一切确实是否定值(或一些简单的东西)——我猜你不是——那么最好不使用并行化,因为python 和你的微处理器会将工作向量化并让它运行得更快:
>>> print([-M for M in range(1,100)])
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]
这是一个指标,在我的机器上使用 python2 运行:
>>> from timeit import timeit
>>> timeit("[-M for M in range(1,100)]", number=100000)
0.6327948570251465
>>> def test():
... pool.map(func, range(1,100))
...
>>> timeit(test, number=100000)
31.26303195953369
编辑:
您的问题的问题在于,您想要实现的目标以及您想要做的事情都不是很清楚。通常并行化的第一条路径是做一个非并行化的版本,并尝试并行化那些工作得更好并且没有互斥的东西。
但是在您的代码中让我印象深刻的一件事是,对于您的 range(1,100) 循环的每次迭代,您实际上都在等待一个进程完成,然后再启动一个新进程:
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
# Create process
p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)
# Start a process
p.start()
# wait for the process p to finish before going on
p.join()
# will continue when p has finished
如果您想解决这个问题,您可以使用Pool,如我的示例所示,或者:
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
# Create process
p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)
# Start a process
p.start()
# wait for the process p to finish before going on
p.join()
# will continue when p has finished
所以快速改进是:
processes = [] # keep a list of all the processes
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
# Create process
for j in range(1,i):
p = mp.Process(target = func, args = (coor_i, j))
# Append to processes list
processes.append(p)
# Start a process
p.start()
# wait for all processes to have finished before quitting (or going on)
for p in processes:
p.join()
所以我刚刚更新了代码:在一个循环中启动所有进程,在另一个循环中阻塞直到它们完成。 .join() 使当前进程阻塞,直到另一个进程完成,所以第二个循环确保每个进程在完成 sn-p 之前完成,或者继续执行进一步的代码。
因此,对于“生成器”问题,我没有注意到您在 mp.Process 行上执行了 for j in range(1, i),因此没有将进程附加到进程列表,而是一个列表生成器。
实际上你的 conde CANNOT 工作,并且应该在编译时失败,因为:
p = Process(target = func, args = (coor_i, j)) for j in range(1 ,i)
^
SyntaxError: invalid syntax
如果您通过以下方式更正此问题:
>>> (p = Process(target = func, args = (coor_i, j)) for j in range(1 ,i))
那么你的原始代码不能工作,因为p 不是一个进程,它是一个列表生成器,p.start() 不能存在:
>>> p.next()
<Process(Process-5, initial)>
>>> p.next()
<Process(Process-6, initial)>
>>> p.next()
<Process(Process-7, initial)>
>>> p.next()
<Process(Process-8, initial)>
>>> p.next()
<Process(Process-9, initial)>
因此,在这种情况下不使用 for one 衬里是一个坏主意,使用传统的 for 进行纠正。