【问题标题】:Why my parallel code is slower than the sequential为什么我的并行代码比顺序代码慢
【发布时间】:2018-08-16 21:12:25
【问题描述】:

我正在尝试实现一种高度可并行化的在线递归并行算法。我的问题是我的 python 实现不能按我的意愿工作。我有两个 2D 矩阵,每次在时间步 t 观察到新的观察值时,我想递归地更新每一列。 我的并行代码是这样的

def apply_async(t):
    worker =  mp.Pool(processes = 4)
    for i in range(4):
        X[:,i,np.newaxis], b[:,i,np.newaxis] =  worker.apply_async(OULtraining, args=(train[t,i], X[:,i,np.newaxis], b[:,i,np.newaxis])).get()


    worker.close()
    worker.join()      




for t in range(p,T):
    count = 0 
    for l in range(p):
        for k in range(4):
            gn[count]=train[t-l-1,k]
            count+=1
    G = G*v +  gn @ gn.T
    Gt = (1/(t-p+1))*G

    if __name__ == '__main__':
        apply_async(t)

这两个矩阵是 X 和 b。我想直接在主内存上替换,因为每个进程递归地只更新矩阵的一个特定列。

为什么这个实现比顺序的慢?

有没有办法在每个时间步恢复过程而不是杀死它们并重新创建它们?这可能是它变慢的原因吗?

【问题讨论】:

  • worker.apply_async() 的任何参数是否可能是大型数据结构?
  • 大是什么意思? X 和 b 是大型 2D 矩阵,但作为参数,我只传递其中的一列。那我不这么认为
  • 好的。我想我找到了您的问题,请参阅下面的答案。您需要根据您的代码和数据结构调整解决方案,但示例应说明问题出在哪里。
  • 你能修复缩进吗?那里有几个错误。您能否举例说明您要修改的矩阵?另外,apply_async 已经是 Pool 的一个方法,请检查一下。您的函数递归调用自身并每次生成一个池。这看起来根本不对。
  • 我想要实现的是递归的,正如我所说的。从我的最后一个问题很明显,我知道这一点,这就是为什么我问“有没有办法恢复每个时间步的过程,而不是杀死它们并再次创建它们?”。因为我想阻止它递归调用自己。无论如何我可以调用相同的进程来更新矩阵的同一列而不是杀死它并一次又一次地创建它?

标签: python python-3.x parallel-processing multiprocessing


【解决方案1】:

原因是,您的程序实际上是顺序的。这是一个示例代码 sn-p 从并行性的角度来看与您的相同:

from multiprocessing import Pool
from time import sleep

def gwork( qq):
    print (qq)
    sleep(1)
    return 42

p = Pool(processes=4)

for q in range(1, 10):
    p.apply_async(gwork, args=(q,)).get()
p.close()
p.join()

运行此程序,您会注意到数字 1-9 在一秒钟内恰好出现一次。为什么是这样?原因是你的.get()。这意味着每次调用 apply_async 实际上都会阻塞get(),直到有结果可用。它将提交一个任务,等待第二个模拟处理延迟,然后返回结果,然后将另一个任务提交到您的池中。这意味着根本没有并行执行。

尝试用这个替换池管理部分:

results = []
for q in range(1, 10):
    res = p.apply_async(gwork, args=(q,))
    results.append(res)
p.close()
p.join()
for r in results:
    print (r.get())

您现在可以看到并行工作,因为您的四个任务现在同时处理。您的循环不会阻塞在 get 中,因为 get 已移出循环,并且只有在准备好时才会收到结果。

注意:如果您向工作人员提供的参数或它们的返回值是大型数据结构,您将失去一些性能。在实践中,Python 将这些实现为队列,与子进程分叉时获取数据结构的内存副本相比,通过队列传输大量数据相对而言速度较慢。

【讨论】:

  • 感谢您的快速回复。如果我使用它,如何确保我的结果将按正确的顺序排列?如果我想立即更换主人的记忆,有什么办法吗?
  • 正如我所写,我不熟悉您的数据结构。也许您可以使用结果字典而不是结果列表来存储键/值对,以便您可以从键将结果与源数据匹配。在这方面我可能帮不上什么忙,我只能解释为什么你的程序根本不并行。
  • 更好的方法是转储 multiprocessing.pool 并将其替换为 concurrent.futures 中的 ProcessPoolExecutor。它的接口几乎与 multiprocessing.pool 相同,它允许在任务完成时附加回调。然后,您可以在任务完成时立即在该回调函数中进行内存替换。
  • 一个池执行器接受一个任务,处理它并吐出结果。您可以拥有一个全局池,而不是为每次迭代声明一个新池,但是如果您希望任务只是“挂”在那里并做某事而不是完成并返回结果,那么您不应该使用 pool.但是,当任务终止时,池执行器不会终止。如果您的池是全局的并且您没有关闭并加入它,那么进程将在那里等待新任务的出现。
  • 不,没有,但您可以使用您的工作人员作为传递方法。您可以将列号作为参数传递给您的工作人员,并让工作人员将其与实际结果一起原封不动地返回。现在您的回调函数可以在将来的结果中访问它。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-11-26
  • 2021-06-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-02-15
  • 1970-01-01
相关资源
最近更新 更多