【问题标题】:How to get multiple return objects from a function used in multiprocessing?如何从多处理中使用的函数获取多个返回对象?
【发布时间】:2015-02-04 17:53:04
【问题描述】:

这可能是一个非常简单的问题,但绝对让我筋疲力尽。 为了使用多处理,我编写了以下代码。主函数创建两个进程,它们都使用相同的函数,称为 prepare_input_data() 但处理不同的输入数据集。此函数必须为每个输入返回多个对象和值,以便在代码的后续步骤中使用(此处不包括在内)。

我想要的是获得多个值或对象作为我在多处理中使用的函数的返回。

def prepare_input_data(inputdata_address,temporary_address, output):
    p=current_process()
    name = p.name
    data_address = inputdata_address 
    layer = loading_layer(data_address)    

    preprocessing_object = Preprocessing(layer)
    nodes= preprocessing_object.node_extraction(layer)
    tree = preprocessing_object.index_nodes()
    roundabouts_dict , roundabouts_tree= find_roundabouts(layer.address, layer, temporary_address)

    #return layer, nodes, tree, roundabouts_dict, roundabouts_tree
    #return [layer, nodes, tree, roundabouts_dict, roundabouts_tree]
    output.put( [layer, nodes, tree, roundabouts_dict, roundabouts_tree])


if __name__ == '__main__':
    print "the data preparation in multi processes starts here"
    output=Queue() 
    start_time=time.time()
    processes =[]
    #outputs=[]
    ref_process = Process(name ="reference", target=prepare_input_data, args=("D:/Ehsan/Skane/Input/Skane_data/Under_processing/identicals/clipped/test/NVDB_test3.shp", "D:/Ehsan/Skane/Input/Skane_data/Under_processing/temporary/",output)) 
    cor_process = Process(name ="corresponding", target=prepare_input_data, args=("D:/Ehsan/Skane/Input/Skane_data/Under_processing/identicals/clipped/test/OSM_test3.shp", "D:/Ehsan/Skane/Input/Skane_data/Under_processing/temporary/",output))
    #outputs.append(ref_process.start)
    #outputs.append(cor_process.start)
    ref_process.start
    cor_process.start
    processes.append(ref_process)
    processes.append(cor_process)
    for p in processes:
        p.join()

    print "the whole data preparation took ",time.time()-start_time
    results={}
    for p in processes:
        results[p.name]=output.get()
    ########################
    #ref_info = outputs[0]
    # ref_nodes=ref_info[0]

上一个错误 当我使用 return 时,ref_info[0] 有 Nonetype。

错误: 根据here 的答案,我将其更改为传递给函数的 Queueu 对象,然后我使用 put() 添加结果并使用 get() 检索它们以进行进一步处理。

Traceback (most recent call last):
File "C:\Python27\ArcGISx6410.2\Lib\multiprocessing\queues.py", line 262, in _feed
    send(obj)
UnpickleableError: Cannot pickle <type 'geoprocessing spatial reference object'> objects

您能帮我解决如何从多处理中的函数返回多个值吗?

【问题讨论】:

  • 你好像忘了问问题!
  • @MarcusMüller:现在可能更清楚了。
  • ... 作为函数的返回... 但您目前没有返回 anything,也没有在寻找在从任何地方返回的值。你能写一个最小且独立的例子来展示你真正想要做什么吗?您发布的大部分代码似乎与多处理问题无关。
  • @Useless:我的函数中有返回值,其语法为 return,但后来我将其更改为 Queue.put(),因为我不断收到 Nonetype。
  • 这个问题绝对不是很清楚......但看看你的错误,你似乎将一个无法“腌制”的对象(在python中序列化)传递给多处理队列。您可以传递一个可选值(例如字符串、整数),或者...*讨厌的解决方案*将其设置在全局变量中。

标签: python return multiprocessing pickle


【解决方案1】:

使用共享状态进行并行编程是一条崎岖不平的道路,即使是有经验的程序员也会出错。一种对初学者更友好的方法是复制数据。这是在子流程之间移动数据的唯一方法(不太正确,但这是一个高级主题)。

引用https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes,您需要设置一个 multiprocessing.Queue 来填充您为每个子流程返回的数据。之后,您可以将要读取的队列传递到下一个阶段。

对于多个不同的数据集,例如您的层、节点、树等,您可以使用多个队列来区分每个返回值。为每个人使用一个队列可能看起来有点混乱,但它简单易懂且安全。

希望对您有所帮助。

【讨论】:

  • 这没有解决如何返回多个值...这是 OP 要求的。
  • 每个值都是一个队列。如果那不是返回多个值,那么我不明白这个问题。我将编辑我的答案以澄清这一点。
【解决方案2】:

如果你使用 jpe_types.paraleel 的 Process 它将像这样返回 Processes 目标函数的返回值

import jpe_types.paralel


def fun():
    return 4, 23.4, "hi", None

if __name__ == "__main__":
    
    p = jpe_types.paralel.Process(target = fun)
    p.start()
    print(p.join())

否则你可以

import multiprocessing as mp

def fun(returner):
    returner.send((1, 23,"hi", None))

if __name__ == "__main__":
    processes = []
    for i in range(2):
        sender, recever = mp.Pipe()
        p = mp.Process(target = fun, args=(sender,))
        p.start()
        processes.append((p, recever))

    resses = []

    for p, rcver in processes:
        p.join()
        resses.append(rcver.recv())
    print(resses)

使用连接将保证返回不会被乱码

【讨论】:

    【解决方案3】:

    如果您希望从multiprocessing 获得多个返回值,那么您可以这样做。这是一个简单的例子,首先是串行python,然后是multiprocessing

    >>> a,b = range(10), range(10,0,-1)
    >>> import math
    >>> map(math.modf, (1.*i/j for i,j in zip(a,b)))
    [(0.0, 0.0), (0.1111111111111111, 0.0), (0.25, 0.0), (0.42857142857142855, 0.0), (0.6666666666666666, 0.0), (0.0, 1.0), (0.5, 1.0), (0.3333333333333335, 2.0), (0.0, 4.0), (0.0, 9.0)]
    >>> 
    >>> from multiprocessing import Pool
    >>> res = Pool().imap(math.modf, (1.*i/j for i,j in zip(a,b)))
    >>> for i,ai in enumerate(a):
    ...   x,y = res.next()
    ...   print("{x},{y} = modf({u}/{d})").format(x=x,y=y,u=ai,d=b[i])
    ... 
    0.0,0.0 = modf(0/10)
    0.111111111111,0.0 = modf(1/9)
    0.25,0.0 = modf(2/8)
    0.428571428571,0.0 = modf(3/7)
    0.666666666667,0.0 = modf(4/6)
    0.0,1.0 = modf(5/5)
    0.5,1.0 = modf(6/4)
    0.333333333333,2.0 = modf(7/3)
    0.0,4.0 = modf(8/2)
    0.0,9.0 = modf(9/1)
    

    因此,要从带有multiprocessing 的函数的返回中获取多个值,您只需要有一个返回多个值的函数……您只需将值作为元组列表返回。

    multiprocessing 的主要问题,正如您从错误中看到的那样……是大多数函数不序列化。所以,如果你真的想做你想做的事……我强烈建议你使用pathos(如下所述)。 multiprocessing 的最大障碍是您作为目标传递的函数必须是可序列化的。您可以对 prepare_input_data 函数进行一些修改……首先是确保它被封装。如果您的函数没有完全封装(例如,它在自己的范围之外进行名称引用查找),那么它可能不会使用pickle 腌制。这意味着,您需要在目标函数中包含所有导入,并通过函数输入传递任何其他变量。您看到的错误 (UnPicklableError) 是由于您的目标函数及其依赖项无法序列化 - 而不是您无法从 multiprocessing 返回多个值。

    虽然我会封装目标函数作为一种良好的做法,但它可能有点乏味,并且可能会减慢您的代码速度。我还建议您将代码转换为使用dillpathos.multiprocessing——dill 是一个高级序列化程序,可以腌制几乎所有的python 对象,pathos 提供了一个使用dillmultiprocessing 分支。这样,您可以在pipe(即apply)或map 中传递大多数python 对象,该对象可从Pool 对象中获得,而不必担心过于费力地重构代码以确保清晰老picklemultiprocessing可以搞定。

    另外,我会使用异步 map 而不是执行您在上面执行的操作。 pathos.multiprocessing 能够在 map 函数中接受多个参数,因此您不需要像上面所做的那样将它们包装在元组 args 中。使用异步map,接口应该更简洁,如果需要,您可以返回多个参数……只需将它们打包在一个元组中。

    这里有一些例子可以证明我上面提到的内容。

    返回多个值:

    >>> from pathos.multiprocessing import ProcessingPool as Pool
    >>> def addsub(x,y):
    ...   return x+y, x-y
    ... 
    >>> a,b = range(10),range(-10,10,2)
    >>> res = Pool().imap(addsub, a, b)
    >>> 
    >>> for i,ai in enumerate(a):
    ...   add,sub = res.next()
    ...   print("{a} + {b} = {p}; {a} - {b} = {m}".format(a=ai,b=b[i],p=add,m=sub))
    ... 
    0 + -10 = -10; 0 - -10 = 10
    1 + -8 = -7; 1 - -8 = 9
    2 + -6 = -4; 2 - -6 = 8
    3 + -4 = -1; 3 - -4 = 7
    4 + -2 = 2; 4 - -2 = 6
    5 + 0 = 5; 5 - 0 = 5
    6 + 2 = 8; 6 - 2 = 4
    7 + 4 = 11; 7 - 4 = 3
    8 + 6 = 14; 8 - 6 = 2
    9 + 8 = 17; 9 - 8 = 1
    >>> 
    

    异步地图: Python multiprocessing - tracking the process of pool.map operation

    pathos: Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()

    pathos: What can multiprocessing and dill do together?

    我们仍然无法运行您的代码……但如果您发布可以运行的代码,则可能更有可能帮助您编辑您的代码(使用pathos fork 和异步map 或其他方式)。

    仅供参考:pathos 的发布有点晚了(即迟到了),所以如果你想尝试一下,最好在这里获取代码:https://github.com/uqfoundation

    【讨论】:

    • OP 显然是初学者,无法掌握控制并发和multiprocessing 组件的基本概念。在这种情况下,我认为建议使用两个第三方包并不是特别有帮助。更好地从教学角度解释如何从概念上解决这个非常基本的问题;然后提出标准库手段,供实际实施。
    • 我一般同意你的看法。但是,上述软件包与我所建议的具有相同的界面(或者非常接近,如果不完全的话)......我已经在 b/c 部分中建议了这些软件包,我是作者,部分原因是它们更干净且在概念上对于初学者来说比 python 标准库包更容易使用。
    猜你喜欢
    • 2010-12-13
    • 1970-01-01
    • 1970-01-01
    • 2011-04-04
    • 2018-11-24
    • 2021-11-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多