【问题标题】:ValueError in MPI scatter method while using 2D list in Python-mpi4py在 Python-mpi4py 中使用 2D 列表时 MPI 分散方法中的 ValueError
【发布时间】:2023-03-17 13:27:01
【问题描述】:

我有一个 csv 文件,我将其读入 2D 列表,我想使用 MPI (mpi4py) 中的 scatter 方法将此列表的不同块发送到不同的处理元素,以按如下方式处理它们:

df = []
with open("data_tiny.csv") as csv_file:
   csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
   df.append(row)

recvbuf = [[""] * (len(df[0])) for _ in range(math.ceil(len(df)//size))]  
recvbuf= comm.scatter(df, root=0)
print('Rank: ',rank, ', recvbuf received: ',recvbuf)
for t in recvbuf[:]:
  if t[7] != 'o3':
    recvbuf.remove(t)
comm.gather(recvbuf, df, root=0)
if rank == 0:
   print('Rank: ',rank, ', recvbuf received: ',df)

我收到以下错误:

Traceback (most recent call last):
File "MPI_1.py", line 21, in <module>
   recvbuf= comm.scatter(df, root=0)
File "mpi4py/MPI/Comm.pyx", line 1267, in mpi4py.MPI.Comm.scatter
File "mpi4py/MPI/msgpickle.pxi", line 730, in mpi4py.MPI.PyMPI_scatter
File "mpi4py/MPI/msgpickle.pxi", line 119, in mpi4py.MPI.Pickle.dumpv
ValueError: expecting 4 items, got 54

错误说 scatter 需要 4 个项目,得到 54(df(2D 数组)的长度是 54,这就是为什么它说 scatter 得到 54)。我的问题是如何将 2d 列表传递给 scatter 方法(而不是通过使用 numpy)并在此处解决错误。

输入数据是一个9列54行的数据如:

 a,  aa, aaa, aaaa, aaaaa, aaaaaa, ab, abb, abbb
 a1,  aa1, aaa1, aaaa1, aaaaa1, aaaaaa1, ab1, abb1, abbb1
 a2,  aa2, aaa2, aaaa2, aaaaa2, aaaaaa2, ab2, abb2, abbb2
 a3,  aa3, aaa3, aaaa3, aaaaa3, aaaaaa3, ab3, abb3, abbb3
 .....
 .....

【问题讨论】:

    标签: python parallel-processing mpi openmpi mpi4py


    【解决方案1】:

    ValueError: 期望 4 个项目,得到 54 个

    发生这种情况是因为分散例程:

    recvbuf= comm.scatter(df, root=0)
    

    预计df 的长度与正在运行的进程数相同( comm.size)。

    由于您使用 4 个进程运行并且 df 有 54 个元素,因此您会收到错误消息。

    > ValueError: expecting 4 items, got 54
    

    要解决这个问题,您需要打包df,使其包含与进程数一样多的元素,其中每个元素都可以是一个数组,其中包含要发送到给定进程的元素。

    例如,假设您正在运行 4 个进程,而 df=[1,2,3,4,5,6,7,8] 您需要创建 df=[[1,2][3,4][5,6][7,8]]。其中 df[0] 将转到进程 0,df[1] 将转到进程 1,依此类推。

    一个可能的解决方案示例:

    import csv
    import math
    from mpi4py import MPI
    
    def split(a, n):
        k, m = divmod(len(a), n)
        return list(a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
    
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()
    
    df = []
    if rank == 0:
        with open("data_tiny.csv") as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=',')
            for row in csv_reader:
                df.append(row)
        df = split(df, size)
    
    recvbuf = comm.scatter(df, root=0)
    print(recvbuf)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-08-25
      • 1970-01-01
      • 2020-03-30
      • 1970-01-01
      • 1970-01-01
      • 2019-10-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多