【问题标题】:multiprocessing of function with constant and iterable arguments具有常量和可迭代参数的函数的多处理
【发布时间】:2013-09-16 17:46:11
【问题描述】:

你好 stackoverflow 用户,

我试图查找这个但找不到答案:我本质上喜欢并行处理一个函数(独立进程!)并且该函数有一个可迭代的(x几个常量参数(kd)。这是一个非常简单的例子:

from multiprocessing import *

def test_function(args):
    k = args[0]
    d = args[1]
    x = args[2]
    del args

    return k*x + d

if __name__ == '__main__':
    pool = Pool(processes=2)

    k = 3.
    d = 5.

    constants = [k,d]
    xvalues = range(0,10)
    result = [pool.apply_async(test_function, constants.append(i)) for i in xvalues]

    output = [r.get() for r in result]

    print output
    #I expect [5.0, 8.0, 11.0, 14.0, 17.0, 20.0, 23.0, 26.0, 29.0, 32.0]

这给了我以下错误信息:

Traceback (most recent call last):
  File "test_function.py", line 23, in <module>
    output = [r.get() for r in result]
  File "C:\Program Files\Python2.7\lib\multiprocessing\pool.py", line 528, in get
    raise self._value
TypeError: test_function() argument after * must be a sequence, not NoneType

所以我的问题是:

此错误消息的实际含义是什么?

如何修复它以获得预期的结果(参见代码示例的最后一行)?

对于调用 apply_sync 的线路是否有更好/工作/优雅的方式?

仅供参考:我是 python 新手,请多多包涵,如果我的帖子需要更多详细信息,请告诉我。

非常感谢您的任何建议!

【问题讨论】:

    标签: python python-2.7 multiprocessing


    【解决方案1】:

    此错误消息的实际含义是什么?

    append 方法返回的值始终是None,因此在做的时候:

    pool.apply_async(test_function, constants.append(i))
    

    您正在调用 pool.apply_asynch 并使用 None 作为 args 参数,但 apply_asynch 需要一个 iterable 作为参数。 apply_asynch 正在做的事情叫做tuple-unpacking

    如何解决它以获得预期的结果?

    要实现预期的输出,只需将i 连接到常量:

    pool.apply_asynch(test_function, (constants + [i],))
    

    有没有更好/工作/优雅的方式来调用 应用同步?

    请注意,您必须将所有参数包装到一个元素元组中,因为您的 test_function 接受单个参数。 你可以这样修改它:

    def test_function(k, d, x):
        # etc
    

    然后简单地使用:

    pool.apply_asynch(test_function, constants + [i])
    

    apply_asynch 将使用tuple-unpacking 自动将args 解包到函数的三个参数中。 (仔细阅读Pool.apply 和朋友的文档)。


    有没有更好/工作/优雅的方式来调用 应用同步?

    正如 Silas 所指出的,您应该使用 Pool.mapPool.map_asynch 方法,而不是使用 Pool.apply_asynch 到值列表,它们会为您做到这一点。

    例如:

    results = pool.map(test_function, [(constants + [i],) for i in xvalues])
    

    但请注意,在这种情况下,test_function 必须接受单个参数,因此您必须手动解压缩常量和 x,就像您在问题中所做的那样。


    另外,作为一般建议:

    • 在您的test_function 中绝对不需要del args。它只会减慢函数的执行速度(非常少)。请谨慎使用del,仅在需要时使用。
    • 您可以使用以下语法,而不是手动分配元组中的元素:

      k, d, x = args
      

      相当于(可能稍慢):

      k = args[0]
      d = args[1]
      x = args[2]
      
    • 预计会大大减速使用multiprocessing 调用这些简单的函数。通信和同步进程的成本非常高,因此您必须避免调用简单函数,并且尽可能尝试“分块”工作(例如,不要单独发送每个请求,而是将 100 个请求的列表发送给单个工作人员论点)。

    【讨论】:

    • 无论如何,pool.mappool.map_asynch 可能会更好地为他服务。
    • 感谢您的精彩回复和 Silas 的补充!我包括了所有建议,但不得不将 pool.map 行修改为以下内容:result = pool.map(test_function, [constants + [i] for i in xvalues]) 并且带有.get() 的后续行实际上是不必要的,打印result 给出了正确的答案。也感谢您的一般性建议。
    【解决方案2】:

    constants.append(i) 返回 None ,你应该先附加值,然后使用 constants 作为第二个参数。

    >>> constants = []
    >>> i = 2
    >>> bug_value = constants.append(i)
    >>> constants
    [2]
    >>> bug_value is None
    True
    >>> 
    

    确实使用result = [pool.apply_async(test_function, constants+ [i]) for i in xvalues]

    list + list 追加两个列表并返回结果列表。

    【讨论】:

    • 感谢您的简洁示例!我的问题是我期望来自constants.append.(i) 的连接。
    猜你喜欢
    • 2020-04-16
    • 2013-03-14
    • 2023-03-17
    • 2022-06-14
    • 2018-03-29
    • 1970-01-01
    • 2022-10-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多