【问题标题】:Difficulties with putting numpy array in multiprocessing queue将 numpy 数组放入多处理队列的困难
【发布时间】:2013-07-05 00:25:09
【问题描述】:

我在输入到多处理队列的 numpy 数组中有参数集,但在工作进程中收到它们时会出现乱码。这是我的代码来说明我的问题。

import numpy as np
from multiprocessing import Process, Queue

NUMBER_OF_PROCESSES = 2

def worker(input, output):
    for args in iter(input.get, 'STOP'):
        print('Worker receives: ' + repr(args))
        id, par = args
        # simulate a complex task, and return result
        result = par['A'] * par['B']
        output.put((id, result))

# Define parameters to process
parameters = np.array([
    (1.0, 2.0),
    (3.0, 3.0)], dtype=[('A', 'd'), ('B', 'd')])

# Create queues
task_queue = Queue()
done_queue = Queue()

# Submit tasks
for id, par in enumerate(parameters):
    obj = ('id_' + str(id), par)
    print('Submitting task: ' + repr(obj))
    task_queue.put(obj)

# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
    Process(target=worker, args=(task_queue, done_queue)).start()

# Get unordered results
results = {}
for i in range(len(parameters)):
    id, result = done_queue.get()
    results[id] = result

# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
    task_queue.put('STOP')

print('results: ' + str(results))

在 64 位 CentOS 计算机上使用 numpy 1.4.1 和 Python 2.6.6,我的输出是:

Submitting task: ('id_0', (1.0, 2.0))
Submitting task: ('id_1', (3.0, 3.0))
Worker receives: ('id_0', (2.07827093387802e-316, 6.9204740511333381e-310))
Worker receives: ('id_1', (0.0, 1.8834810076011668e-316))
results: {'id_0': 0.0, 'id_1': 0.0}

如图所示,带有numpy记录数组的元组在提交任务时状态良好,但worker收到参数时出现乱码,结果不正确。我在multiprocessing documentation 中读到“代理方法的参数是可腌制的”。据我所知,numpy 数组是完全可以挑选的:

>>> import pickle
>>> for par in parameters:
...     print(pickle.loads(pickle.dumps(par)))
...     
(1.0, 2.0)
(3.0, 3.0)

我的问题是为什么工人没有正确接收参数?我怎样才能将一行 numpy 记录数组传递给工作人员?

【问题讨论】:

    标签: python numpy multiprocessing pickle


    【解决方案1】:

    numpy 数组应该是可腌制的(我认为),但在这里你实际上是在处理 numpy.void 实例,我不知道为什么,似乎不能腌制。

    如果你这样做:

    for par in parameters:
        print(type(par))
        print pickle.loads(pickle.dumps(par))
    

    你得到:

    <type 'numpy.void'>
    (-1.3918046672290164e-41, -1.3918046679677054e-41)
    <type 'numpy.void'>
    (-1.3918046672290164e-41, -1.3918046679677054e-41)
    

    解决此问题的一种方法是应用 parameters = parameters.reshape([-1, 1]) 将 (N,) 数组转换为 (N, 1) 数组。这样,当您循环参数时,您将获得大小为 1 的数组,希望能够很好地腌制。希望对您有所帮助。

    【讨论】:

    【解决方案2】:

    我和你遇到了同样的问题,但我的情况和你有点不同。

    最初,我在子进程的每个循环中输出一个数字,并将它们组合成 numpy.dnarray。最后,我将数组传递给队列,但运行 p.join() 后我的主进程无法启动。

    旧代码如下所示

    # subprocess
    for i in range(n):
        array[i] = data[i]
    queue.put(array)
    # main process
    queue.get()
    

    不过,我换了另一种方式来处理这样的问题

    # subprocess
    for i in range(n):
        queue.put((i, data[i]))
    # main process
    for i in range(n):
        while queue.empty():
            i, data = queue.get()
            array[i] = data
    

    简单地说,我只是将我的数据分成更小的部分(数据,位置)并将它们传递给队列,主进程接收数据同步。 我希望它会有所帮助

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-21
      • 2013-09-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多