【问题标题】:Parallelize Array Assignments in Python在 Python 中并行化数组赋值
【发布时间】:2019-08-26 05:20:30
【问题描述】:

我一直在尝试在 main 调用整个函数时并行化整个函数,或者在没有运气的情况下在下面看到的函数的任何部分,似乎我无法摆脱 TypeError: function object is not iterable。感谢任何建议。

from joblib import Parallel, delayed
num_cores = multiprocessing.cpu_count()
parallel = Parallel(n_jobs=num_cores)
from multiprocessing import Pool
p = Pool(4)

def kmean(layerW,cluster):
    weights1d = np.reshape(layerW,-1)
    print(np.shape(weights1d))

    #Parallelizing Here
    centroids,_ = parallel(delayed(kmeans(weights1d, cluster)))
    idxs,_      = parallel(delayed(vq(weights1d,centroids)))

    #Here, using Parallel
    weights1d_q = parallel(delayed([centroids[idxs[i]] for i in range(len(weights1d))]))

    #OR --- using pool instead
    weights1d_q = p.map([centroids[idxs[i]] for i in range(len(weights1d))])
    weights4d_q  = np.reshape(weights1d_q, np.shape(layerW))
    return weights4d_q

【问题讨论】:

  • 你打算并行化什么? kmeans 算法?
  • 这也可以,但我想这里的瓶颈是weights1d_q 在我尝试为每个元素分配质心值时分配结束
  • 你确定function object在说什么了吗?
  • 当你询问一个错误时,你应该明确它发生在哪里。通常我们通过引用回溯来做到这一点。也就是说,完整的错误消息。哪一行代码试图迭代作为函数的变量?我的猜测是,您在某个地方分配了一个变量 x = foo,而您应该执行 x = foo()x = foo(args)
  • 你能贴出我们可以复制粘贴的代码来看看问题吗?如果您阅读来自@user3666197 和joblib 文档joblib.readthedocs.io/en/latest/parallel.html 的答案,您会发现您当前的delayed 调用是错误的......

标签: python numpy parallel-processing parallelism-amdahl


【解决方案1】:

QTypeError: function object is not iterable@

为了TypeError:

TypeError 由于语法错误(对 joblib.Parallel( delayed( ... ) ... ) 的格式错误调用错误地遵守了记录的调用语法构造函数。

示例 1:正确调用:
此调用遵循记录在案的语法规范,直至最后一个点:

>>> from joblib import Parallel, delayed
>>> parallel = Parallel( n_jobs = -1 )
>>> import numpy as np
>>> parallel( delayed( np.sqrt ) ( i**2 ) for i in range( 10 ) )
#          ^  ^^^^^^^     ^^^^     ^^^^   |||
#          |  |||||||     ||||     ||||   vvv
#JOBS(-1):-+  |||||||     ||||     ||||   |||
#DELAYED:-----+++++++     ||||     ||||   |||
#FUN( par ):--------------++++     ||||   |||
#     |||                          ||||   |||
#     +++-FUN(signature-"decl.")---++++   |||
#     ^^^                                 |||
#     |||                                 |||
#     +++-<<<-<iterator>-<<<-<<<-<<<-<<<--+++
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

生成的结果确认调用完全合规且可解释。

示例 2:错误调用:

>>> from joblib import Parallel, delayed
>>> parallel = Parallel( n_jobs = -1 )
>>> import numpy as np
>>> parallel( delayed( np.sqrt( 10 ) ) )          #### THIS SLOC IS KNOWINGLY WRONG
#          ^  ^^^^^^^     ^^^^(????)  ????   ???  ####
#          |  |||||||     ||||        ||||   vvv  ####
#JOBS(-1):-+  |||||||     ||||        ||||   |||  ####
#DELAYED:-----+++++++     ||||        ||||   |||  #### DELAYED( <float64> )
#FUN( par ):--------------++++        ||||   |||  #### GOT NO CALLABLE FUN( par ) 
#     |||                             ||||   |||  ####        BUT A NUMBER
#     +++-FUN(signature-"decl.")------++++   |||  ####        FUN( signature )
#     ^^^                                    |||  ####        NOT PRESENT
#     |||                                    |||  ####        AND FEEDER
#     +++-<<<-<iterator>-<<<-<<<-<<<-<<<-<<<-+++  #### <ITERATOR> MISSING
#                                                 ####
Traceback (most recent call last):                ####   FOR DETAILS, READ THE O/P
  File "<stdin>", line 1, in <module>             ####   AND EXPLANATION BELOW
  File ".../lib/python3.5/site-packages/joblib/parallel.py", line 947, in __call__
    iterator = iter(iterable)
TypeError: 'function' object is not iterable

结果证实,O/P 使用的语法与记录在案的 joblib.Parallel( delayed(...) ... ) 不兼容
Q.E.D.


补救措施:

遵循joblib.Parallel( delayed( ... ) ... ) 记录的语法:

#entroids, _ = parallel( delayed( kmeans(weights1d, cluster)))
#                                 ^^^^^^(..................)
#                                 ||||||(..................)
#THIS-IS-NOT-A-CALLABLE-BUT-VALUE-++++++(..................)
#
centroids, _ = parallel( delayed( kmeans ) ( weights1d, cluster ) for ... )
#                                 ^^^^^^     ^^^^^^^^^^^^^^^^^^   |||||||
#                                 ||||||     ||||||||||||||||||   vvvvvvv
# CALLABLE FUN()------------------++++++     ||||||||||||||||||   |||||||
#          FUN( <signature> )----------------++++++++++++++++++   |||||||
#               ^^^^^^^^^^^                                       |||||||
#               |||||||||||                                       |||||||
#               +++++++++++------------<<<--feeding-<iterator>----+++++++

最好的第一步:

是重新阅读joblib.Parallel的设计方式和使用模式的文档细节,以便更好地熟悉该工具:

joblib.Parallel( n_jobs       = None,   # how many jobs will get instantiated
                 backend      = None,   # a method, how these will get instantiated
                 verbose      = 0,
                 timeout      = None,
                 pre_dispatch = '2 * n_jobs',
                 batch_size   = 'auto',
                 temp_folder  = None,
                 max_nbytes   = '1M',
                 mmap_mode    = 'r',
                 prefer       = None,   # None | { ‘processes’, ‘threads’ }
                 require      = None    # None | ‘sharedmem’ ~CONSTRAINTS backend
                 )

接下来,人们可能会掌握一些琐碎的例子(并进行实验并将其扩展到自己的预期用例):

      Parallel(  n_jobs = 2 ) ( delayed( sqrt ) ( i ** 2 ) for i in range( 10 ) )
      #          ^              ^^^^^^^  ^^^^     ^^^^^^   |||
      #          |              |||||||  ||||     ||||||   vvv
      #JOBS:-----+              |||||||  ||||     ||||||   |||
      #DELAYED:-----------------+++++++  ||||     ||||||   |||
      #FUN( par ):-----------------------++++     ||||||   |||
      #     |||                                   ||||||   |||
      #     +++--FUN(-signature-"declaration"-)---++++++   |||
      #     ^^^                                            |||
      #     |||                                            |||
      #     +++-<<<-<iterator>-<<<-<<<-<<<-<<<-<<<-<<<-<<<-+++

      Parallel(  n_jobs = -1 ) ( 
                 delayed( myTupleConsumingFUN ) ( # aFun( aTuple = ( a, b, c, d ) )
                           aTupleOfParametersGeneratingFUN( i ) )
                 for                                        i in range( 10 )
                 )

下一步:尝试了解使用n_jobs 实例化的成本和限制

joblib 的默认后端将在 isolated Python 进程中运行每个函数调用,因此 它们不能改变常见的主程序中定义的 Python 对象

但如果并行函数确实需要依赖线程的共享内存语义,则应使用require='sharedmem' 明确说明

请记住,从性能的角度来看,依赖共享内存语义可能不是最佳的,因为对共享 Python 对象的并发访问将遭受锁争用。

使用基于线程的后端允许“共享”,但这意味着这样做的巨大成本 - 线程重新引入 GIL 步进,这将重新[SERIAL] - 以 GIL 锁步方式将代码执行流程重新转换为一个接一个接一个。对于产生比原始纯[SERIAL] 代码更差的性能的计算密集型处理(虽然此模式有助于延迟屏蔽用例,其中等待网络响应可能允许线程释放 GIL 锁并让其他线程继续工作)

有一些步骤可以实现,以便使基于流程的单独计算能够传达这种需求,但是,这需要一些附加成本。

计算密集型问题必须平衡对最终性能的需求(使用更多内核),但要记住只有一个隔离(拆分)的工作单元和最小的参数传输和结果返回的附加成本,所有与利用 joblib.Parallel 可用形式的 just-[CONCURRENT] 流程调度的错误设计意图相比,这很容易花费更多。

了解更多details on joblib.Parallel

更多details on add-on costs and atomicity-of-work对并行加速的影响

【讨论】:

  • 感谢@user3666197 的回复,但没有一个与我的问题有关!
  • @Amir 恕我直言,经过 40 多年的设计、分析和剖析 HPC 级并行计算和分布式计算系统,我拥有一批人手——根据经验敢于声称所有这些方面对于使代码既无错误又尽可能接近 HPC 级性能很重要。 随意用可重现的 MVC 问题公式表达其他论点(到目前为止,这里没有一个作为基准),但投票否决(惩罚一个场景steps ) 与 StackOverflow 社区网络礼节不兼容。
  • 感谢您抽出宝贵时间提供信息丰富的答案,但您的答案文本并未反映我的问题,即为什么上述函数既不能在 def 函数级别也不能在各个细分市场。
  • @Amir 你错了。发布完全反映错误,这是代码失败的根本原因。[最好的第一步][NEXT] 段落中提供的演示提供了充分的证据最重要的是,如果遵循它(而不是反对和进一步反对),它将引导您迈向工作代码。关于性能上限和 RAM 限制的评论是可能进一步塑造您的并行化技能的奖励,但步骤以上是删除原始代码示例中主要错误的直接方法。不遵守记录的语法
猜你喜欢
  • 1970-01-01
  • 2011-01-12
  • 2014-07-12
  • 1970-01-01
  • 1970-01-01
  • 2012-02-08
  • 1970-01-01
  • 2021-11-08
  • 1970-01-01
相关资源
最近更新 更多