【问题标题】:How to scatter and gather a list of python of objects in mpi4py如何在 mpi4py 中分散和收集对象的 python 列表
【发布时间】:2018-06-19 07:56:35
【问题描述】:

我有一个包含 100,000 个 Python 对象 的列表,我想在 mpi4py 中分散和收集这些对象。

当我尝试使用 8 个处理器时,我得到:

SystemError: 负大小传递给 PyBytes_FromStringAndSize

关于散射。

当我尝试使用 64 个处理器时,我得到了同样的错误,但在集合中。

当我尝试从列表中创建一个对象数组并使用 Gather 和 Scatter 时,我收到一个错误,基本上说明数组的 dtype 不能是对象。

有什么办法可以让这个工作吗?或者我可以使用 MPI 以外的任何其他东西?
我在一台 8 节点、64 ppn 的计算机上运行它。

【问题讨论】:

  • 您能否提供一个最小代码示例以便我们重现错误?

标签: python parallel-processing cluster-computing hpc mpi4py


【解决方案1】:

使用 scatter 和gather,拆分包含 100000 个项目的 numpy 数组的示例。

import numpy as np
from mpi4py import MPI
from pprint import pprint
comm = MPI.COMM_WORLD

pprint("-" * 78)
pprint(" Running on %d cores" % comm.size)
pprint("-" * 78)

N = 100000
my_N = N // 8

if comm.rank == 0:
    A = np.arange(N, dtype=np.float64)
else:
    A = np.empty(N, dtype=np.float64)

my_A = np.empty(my_N, dtype=np.float64)

# Scatter data 
comm.Scatter([A, MPI.DOUBLE], [my_A, MPI.DOUBLE])

pprint("After Scatter:")
for r in range(comm.size):
    if comm.rank == r:
        print("[%d] %s" % (comm.rank, len(my_A)))
    comm.Barrier()

# Allgather data into A
comm.Allgather([my_A, MPI.DOUBLE], [A, MPI.DOUBLE])

pprint("After Allgather:")
for r in range(comm.size):
    if comm.rank == r:
        print("[%d] %s" % (comm.rank, len(A)))
    comm.Barrier()

您也可以查看scatterv and gatherv、更多examples herehere

【讨论】:

    【解决方案2】:

    我不确定这是不是答案,我也不确定您是否还在寻找答案,但是...

    所以你有 100,000 个 python 对象。如果这些对象是常规数据(数据集),而不是某个类的实例,则将数据作为 json 字符串传递。像这样的:

    #!/usr/bin/env python
    
    import json
    import numpy as np
    from mpi4py import MPI
    
    
    comm = MPI.COMM_WORLD
    
    if comm.rank == 0:
        tasks = [
            json.dumps( { 'a':1,'x':2,'b':3 } ),
            json.dumps( { 'a':3,'x':1,'b':2 } ),
            json.dumps( { 'a':2,'x':3,'b':1 } )
        ]
    else:
        tasks = None
    
    
    # Scatter paramters arrays
    unit = comm.scatter(tasks, root=0)
    
    p = json.loads(unit)
    print "-"*18
    print("-- I'm rank %d in %d size task" % (comm.rank,comm.size) )
    print("-- My paramters are: {}".format(p))
    print "-"*18
    
    comm.Barrier()
    
    calc = p['a']*p['x']**2+p['b']
    
    # gather results
    result = comm.gather(calc, root=0)
    # do something with result
    
    if comm.rank == 0:
        print "the result is ", result
    else:
        result = None
    

    注意,如果您只有 8 个节点/核心,则必须在 tasks 列表中创建 8 条记录,并依次分散和收集所有 100,000 个数据集。如果您的所有数据集都在ALLDATA 列表中,代码可能如下所示:

    def calc(a=0,x=0,b=0):
        return a*x**2+b
    
    if comm.rank == 0: collector = []
    for xset in zip(*(iter(ALLDATA),) * comm.size):
        task = [ json.dumps(s) for s in xset ]
        comm.Barrier()
        unit = comm.scatter(task if comm.rank == 0 else None, root=0)
        p = json.loads(unit)
        res = json.dumps( calc(**p) )
        totres = comm.gather(res, root=0)
        if comm.rank == 0:
            collector += [ json.loads(x) for x in  totres  ]
    
    
    
    if comm.rank == 0:
        print "the result is ", collector
    

    【讨论】:

      猜你喜欢
      • 2016-06-30
      • 2021-08-21
      • 2023-03-17
      • 2018-03-27
      • 1970-01-01
      • 2023-03-22
      • 2018-02-14
      • 2021-06-14
      • 2013-02-28
      相关资源
      最近更新 更多