【问题标题】:Multiprocessing an iterable in python在python中多处理一个可迭代的
【发布时间】:2017-07-25 02:25:05
【问题描述】:

我正在尝试拆分以下代码以允许在 python 中进行多处理,这对我来说确实是一项令人沮丧的任务 - 我是多处理的新手,并且已经阅读了文档和尽可能多的示例,但仍然没有找到了一种解决方案,可以同时在所有 cpu 内核上运行。

我想将可迭代对象分成四等份并让它并行计算测试。

我的单线程示例:

import itertools as it
import numpy as np

wmod = np.array([[0,1,2],[3,4,5],[6,7,3]])
pmod = np.array([[0,1,2],[3,4,5],[6,7,3]])

plines1 = it.product(wmod[0],wmod[1],wmod[2])
plines2 = it.product(pmod[0],pmod[1],pmod[2])

check = .915
result = []

for count, (A,B) in enumerate(zip(plines1,plines2)):
    pass

    test = (sum(B)+10)/(sum(A)+12)
    if test > check:
        result = np.append(result,[A,B])
print('results: ',result)

我意识到这是一对 3x3 矩阵的一个非常小的例子,但我想将它应用到一对更大的矩阵上,计算大约需要一个小时。感谢您提供的任何建议。

【问题讨论】:

  • 好吧,一方面,我会将result = np.append(result, [A, B]) 带出你的循环内部。为什么你甚至在这里使用numpy 数组而不是list?对于数组与列表,这样的附加将非常低效。奇怪的是你也使用result = []...
  • 为了可扩展性和效率,我决定使用 numpy。正如我所说,3x3 矩阵仅用于示例。而for循环是一个迭代,它不会保留数据,除非我以某种方式检索它。
  • 是的,但numpy 不会神奇地让您的代码更具可扩展性。像这样使用numpy 会产生相反的效果。
  • 不管你对为什么 numpy 有什么看法,你的 cmets 对我的问题没有多大帮助:多处理。任何实际帮助将不胜感激。

标签: python python-3.x multiprocessing itertools iterable


【解决方案1】:

我建议使用队列来转储您的迭代。类似的东西:

import multiprocessing as mp
import numpy as np
import itertools as it


def worker(in_queue, out_queue):
    check = 0.915
    for a in iter(in_queue.get, 'STOP'):
        A = a[0]
        B = a[1]
        test = (sum(B)+10)/(sum(A)+12)
        if test > check:
            out_queue.put([A,B])
        else:
            out_queue.put('')

if __name__ == "__main__":
    wmod = np.array([[0,1,2],[3,4,5],[6,7,3]])
    pmod = np.array([[0,1,2],[3,4,5],[6,7,3]])

    plines1 = it.product(wmod[0],wmod[1],wmod[2])
    plines2 = it.product(pmod[0],pmod[1],pmod[2])

    # determine length of your iterator
    counts = 26

    # setup iterator
    it = zip(plines1,plines2)

    in_queue = mp.Queue()
    out_queue = mp.Queue()

    # setup workers
    numProc = 2
    process = [mp.Process(target=worker,
                          args=(in_queue, out_queue), daemon=True) for x in range(numProc)]

    # run processes
    for p in process:
        p.start()

    results = []
    control = True

    # fill queue and get data
    # code fills the queue until a new element is available in the output
    # fill blocks if no slot is available in the in_queue
    for idx in range(counts):
        while out_queue.empty() and control:
            # fill the queue
            try:
                in_queue.put(next(it), block=True) 
            except StopIteration:
                # signals for processes stop
                for p in process:
                    print('stopping')
                    in_queue.put('STOP')
                control = False
                break
        results.append(out_queue.get(timeout=10))

    # wait for processes to finish
    for p in process:
        p.join()

    print(results)

    print('finished')

但是,您必须先确定任务列表的长度。

【讨论】:

  • 我试图在将所有代码实现到我的项目中之前理解它,但是当我尝试运行示例时,我收到一个错误,指出对象 int 不可迭代。
  • 你能指出他在抱怨哪一行吗?也许是 worker-function 导致了问题。尝试在您的test 中添加一个简单的打印命令
  • 我已经用一个工作示例或您的测试功能更正了我的答案。它在我的 python 3.5 上运行没有错误。
  • 感谢@Raja - 这更有意义。现在让我了解如何使用 pycuda 或 gnumpy 来执行一些分析工作
猜你喜欢
  • 2017-11-17
  • 2023-01-31
  • 2016-07-02
  • 1970-01-01
  • 1970-01-01
  • 2022-01-23
  • 2020-05-16
  • 2011-08-27
  • 2012-04-16
相关资源
最近更新 更多