【问题标题】:multiprocessing python : how to split jobs多处理 python:如何拆分作业
【发布时间】:2015-11-08 19:07:51
【问题描述】:

我尝试编写一个用于多处理的 python。一直在网上看,还是不明白怎么写。

我的脚本:

import multiprocessing as mp
import numpy as np
import ctypes

def func(M, j, :
    coor_i = np.zeros(1000)
    coor_j = np.ones(1000) # In reality it is loaded from a txt
    A = np.square(coor_i - coor_j)
    a = A.sum
    M[j] = a

 for i in range(1,100) :
     M = mp.Array(ctypes.c_double, np.ones(i))
     p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)

     p.start()
     p.join()

     print(M)

我在网上看了一下,我看到了 - 'mp.Pool'、'processes'、'mp.Queue'

非常感谢。

【问题讨论】:

  • 首先 - 您在这里遇到的实际问题是什么?其次 - 由于缩进已损坏,此代码在语法上无效。
  • 你需要先修复缩进

标签: python multiprocessing python-multiprocessing


【解决方案1】:

首先,您的第一个错误是没有从您的func() 函数返回任何内容。在 python 中,所有值都是对内存中对象的引用,当您进行赋值时,您将用新对象替换引用的值。所以做M = -M并不是改变M的对象,而是创建一个新的,在函数范围内改变引用M

这意味着你的函数将总是返回None

>>> from multiprocessing import Pool
>>> def func(M): # if you call with M=1
...    # M==1 here
...    M = -M
...    # M==-1 here
...
>>> M = 1
>>> M = func(M)
>>> print(M)
None

要解决此问题,您需要将其 返回 值:

>>> def func(M):
...     return -M
...
>>> print(func(1))
-1

然后,并行化工作的最佳方法是使用进程池,以便您可以控制并行运行的实例数量,直接适应 from the documentation examples

>>> def func(M):
>>>     return -M
... 
>>> pool = Pool(processes=4)                # start 4 worker processes
>>> results = []
>>> for i in range(1,100):
...     results.append(pool.apply_async(func, [i]))    # evaluate "func(i)" asynchronously
...
>>> print [result.get() for result in results]
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]

做同样事情的另一种方法是使用:

>>> print(pool.map(func, range(1,100)))
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]

话虽如此,如果您所做的一切确实是否定值(或一些简单的东西)——我猜你不是——那么最好使用并行化,因为python 和你的微处理器会将工作向量化并让它运行得更快:

>>> print([-M for M in range(1,100)])
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]

这是一个指标,在我的机器上使用 python2 运行:

>>> from timeit import timeit
>>> timeit("[-M for M in range(1,100)]", number=100000)
0.6327948570251465
>>> def test():
...     pool.map(func, range(1,100))
...
>>> timeit(test, number=100000)
31.26303195953369

编辑:

您的问题的问题在于,您想要实现的目标以及您想要做的事情都不是很清楚。通常并行化的第一条路径是做一个非并行化的版本,并尝试并行化那些工作得更好并且没有互斥的东西。

但是在您的代码中让我印象深刻的一件事是,对于您的 range(1,100) 循环的每次迭代,您实际上都在等待一个进程完成,然后再启动一个新进程:

 for i in range(1,100) :
     M = mp.Array(ctypes.c_double, np.ones(i))
     # Create process
     p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)

     # Start a process
     p.start()
     # wait for the process p to finish before going on
     p.join()
     # will continue when p has finished

如果您想解决这个问题,您可以使用Pool,如我的示例所示,或者:

 for i in range(1,100) :
     M = mp.Array(ctypes.c_double, np.ones(i))
     # Create process
     p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)

     # Start a process
     p.start()
     # wait for the process p to finish before going on
     p.join()
     # will continue when p has finished

所以快速改进是:

 processes = [] # keep a list of all the processes
 for i in range(1,100) :
     M = mp.Array(ctypes.c_double, np.ones(i))
     # Create process
     for j in range(1,i):
         p = mp.Process(target = func, args = (coor_i, j))
         # Append to processes list
         processes.append(p)
         # Start a process
         p.start()

# wait for all processes to have finished before quitting (or going on)
for p in processes:
    p.join() 

所以我刚刚更新了代码:在一个循环中启动所有进程,在另一个循环中阻塞直到它们完成。 .join() 使当前进程阻塞,直到另一个进程完成,所以第二个循环确保每个进程在完成 sn-p 之前完成,或者继续执行进一步的代码。


因此,对于“生成器”问题,我没有注意到您在 mp.Process 行上执行了 for j in range(1, i),因此没有将进程附加到进程列表,而是一个列表生成器。

实际上你的 conde CANNOT 工作,并且应该在编译时失败,因为:

p = Process(target = func, args = (coor_i, j)) for j in range(1 ,i)
                                               ^
SyntaxError: invalid syntax

如果您通过以下方式更正此问题:

>>> (p = Process(target = func, args = (coor_i, j)) for j in range(1 ,i))

那么你的原始代码不能工作,因为p 不是一个进程,它是一个列表生成器,p.start() 不能存在:

>>> p.next()
<Process(Process-5, initial)>
>>> p.next()
<Process(Process-6, initial)>
>>> p.next()
<Process(Process-7, initial)>
>>> p.next()
<Process(Process-8, initial)>
>>> p.next()
<Process(Process-9, initial)>

因此,在这种情况下不使用 for one 衬里是一个坏主意,使用传统的 for 进行纠正。

【讨论】:

  • 抱歉,我的代码应该更准确一些。我已经对其进行了编辑,因此它与我的真实代码更相似。我正在尝试的是并行化 for 循环,而不是函数本身。
  • 不知何故,有一条关于 - processes[-1].start() - 'generator' object has no attribute 'start' 的错误消息你介意解释为什么“for p”循环应该放在“for i”循环之外?为什么“进程[-1]”需要-1?
  • 我没有尝试代码,实际上我看到了什么问题......但我必须去,我稍后会回来修复它
  • 这是个好消息。谢谢。我应该说得更清楚。我知道如何做 mp.Array,但不知何故它只是不并行。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-11-11
  • 1970-01-01
  • 1970-01-01
  • 2016-10-30
  • 1970-01-01
  • 2016-06-05
相关资源
最近更新 更多