【问题标题】:Why is there no speed-up when using pythons multiprocessing for embarassingly parallel problem within a for-loop, with shared numpy data?为什么在 for 循环中使用 python 的多处理来解决令人尴尬的并行问题并共享 numpy 数据时没有加速?
【发布时间】:2010-12-07 03:29:22
【问题描述】:

我想加速一个与贝叶斯推理相关的令人尴尬的并行问题。目的是推断一组图像 x 的系数 u,给定矩阵 A,使得 X = A*U。 X 具有尺寸 mxn、A mxp 和 U pxn。对于 X 的每一列,必须推断系数 U 的最优对应列。最后,此信息用于更新 A。我使用 m = 3000,p = 1500 和 n = 100。 因此,由于它是一个线性模型,系数矩阵 u 的推断由 n 个独立计算组成。因此,我尝试使用 Python 的多处理模块,但没有加速。

这是我所做的:

没有并行化的主要结构是:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

    ## Begin Warm Start ##
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf.
    for ws in range(5):
        # Initialize the coefficients
        if ws:
            theta = U
        else:
            theta = np.dot(A.T, X)

        # Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
        # Crwlasso_cd is the function that does the inference per data sample
        # It's basically a C-inline code
        for k in range(batch_size):
            U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

        # Given the inferred coefficients, update and renormalize
        # the basis functions A 
        dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
        A += (eta / batch_size) * dA1
        A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))

多处理的实现:

我尝试实现多处理。我有一台可以使用的 8 核机器。

  1. 有 3 个 for 循环。唯一似乎“可并行化”的是第三个,其中推断了系数:
    • 生成一个队列并将迭代数从 0 到 batch_size-1 堆叠到队列中
    • 生成8个进程,让它们通过队列工作
  2. 使用 multiprocessing.Array 共享数据 U

因此,我将第三个循环替换为以下内容:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

        # Within the for-loops:
        for p in xrange(batch_size):
        work_queue.put(p)

        processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

        for p in processes:
            p.start()
            print p.pid
        for p in processes:
            p.join()

这是 Wrap_mp 类:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
    processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

    def __init__(self, arr):
        """ Initialize a shared array from a numpy array.

            The data is copied.
        """
        self.data = ndarray_to_shmem(arr)
        self.dtype = arr.dtype
        self.shape = arr.shape

    def __array__(self):
        """ Implement the array protocole.
        """
        arr = shmem_as_ndarray(self.data, dtype=self.dtype)
        arr.shape = self.shape
        return arr

    def asarray(self):
        return self.__array__()

这里是函数 infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

    while True:
        try:
            index = work_queue.get(block=False)
            x = X[:,index]
            U = U_mp.asarray()
            theta = np.dot(phit,x)

            # Infer the coefficients of the column index
            U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

         except Empty:
            break

现在的问题如下:

  1. 对于给定的数据维度,多处理版本并不比单线程版本快。
  2. 进程 ID 随每次迭代而增加。这是否意味着不断产生新的进程?这不会产生巨大的开销吗?我怎样才能避免这种情况?是否有可能在整个 for-loop 中创建 8 个不同的进程并用数据更新它们?
  3. 我在进程之间共享系数 U 的方式是否会减慢计算速度?还有其他更好的方法吗?
  4. 进程池会更好吗?

我非常感谢任何形式的帮助!一个月前我开始使用 Python,现在很迷茫。

引擎

【问题讨论】:

  • 工作调度真的需要一次提交一份吗?预先为每个核心安排多个工作单元是否不公平?我怀疑如果您从进程池中看到的改进很少,那么大量时间都花在进程队列中的锁争用上。

标签: python multiprocessing


【解决方案1】:

每次创建流程时,您都在创建新流程。如果您在 for 循环中执行此操作,那么是的,您每次都在循环中启动新进程。听起来您想要做的是在循环外初始化队列和进程,然后在循环内填充队列。

我之前使用过 multiprocessing.Pool,它很有用,但与您已经使用 Queue 实现的功能相比,它提供的功能并不多。

【讨论】:

  • 感谢您的回答!但是你怎么能在循环之外真正创建进程,然后只更新它们给定的变量并在没有 p.join() 的情况下同步结果呢?因为join-function正在关闭进程,对吧?
  • p.join 只是等待进程退出,如果它调用 sys.exit 或从函数返回,它将执行此操作。在进程上调用 .start 后,它将与主进程并行执行。当工作进程调用 work_queue.get() 时,它们将阻塞,直到工作队列中有一个条目可供它们使用。当您在主进程中调用 work_queue.put() 时,您会将工作放入队列中,您可以继续这样做,直到它们完成为止。
  • 所以,很抱歉再次询问,但我明白了:我用 processes = [Process(...) for ...] 初始化主循环外的进程,并在循环内初始化进程,一旦我第一次填充了队列,只启动一次进程,我让它们通过队列工作,直到它为空。然后,在第二次迭代中,我再次填充队列。现在,这些进程将自动通过队列工作,而无需我以任何方式再次激活它们,它们将更新共享内存对象?
  • 是的,这听起来很正确。您将队列和共享内存阵列用作 IPC,因此您只需填满队列以让他们完成工作,然后他们会将结果放入阵列中。一旦你的主循环用完了要分发的工作,它必须以某种方式通知进程退出,然后它可以简单地将它们全部加入并退出。
【解决方案2】:

最终,这一切都归结为一个问题:是否可以在主 for 循环之外启动进程,并为每次迭代提供更新的变量,让它们处理数据,并收集新计算的来自所有流程的数据,而不必每次迭代都启动新流程?

【讨论】:

    猜你喜欢
    • 2011-01-22
    • 1970-01-01
    • 2022-01-13
    • 2017-02-22
    • 1970-01-01
    • 2019-03-04
    • 1970-01-01
    • 1970-01-01
    • 2013-04-29
    相关资源
    最近更新 更多