【问题标题】:Reading Files in Parallel mpi4py并行读取文件 mpi4py
【发布时间】:2022-06-24 17:03:45
【问题描述】:

我有一系列 n 个文件,我想使用 mpi4py 并行读取它们。每个文件都包含一个列向量,作为最终结果,我想获得一个包含所有单个向量的矩阵,如 X = [x1 x2 ... xn]。

在代码的第一部分,我创建了包含所有文件名的列表,并通过 scatter 方法将列表的一部分分发到不同的内核。

import numpy as np
import pandas as pd

from mpi4py import MPI

comm   = MPI.COMM_WORLD
rank   = comm.Get_rank()
nprocs = comm.Get_size()

folder     = "data/"    # Input directory
files      = []         # File List

# Create File List -----------------------------------------------------------
if rank == 0:

    for i in range(1,2000):
        filename = "file_" + str(i) + ".csv"
        files = np.append(files,filename)

    print("filelist complete!")

    # Determine the size of each sub task
    ave, res = divmod(files.size, nprocs)
    counts   = [ave + 1 if p < res else ave for p in range(nprocs)]

    # Determine starting and ending indices of each sub-task
    starts = [sum(counts[:p]) for p in range(nprocs)]
    ends   = [sum(counts[:p+1]) for p in range(nprocs)] 

    # Convert data into list of arrays
    fileList = [files[starts[p]:ends[p]] for p in range(nprocs)]

else:

    fileList = None 

fileList = comm.scatter(fileList, root = 0)

在这里我创建了一个矩阵 X 来存储向量。

# Variables Initialization ---------------------------------------------------

# Creation Support Vector
vector = pd.read_csv(folder + fileList[0])
vector = vector.values

vectorLength = len(vector)

# Matrix
X = np.ones((vectorLength, len(fileList)))
# ----------------------------------------------------------------------------

在这里,我读取了不同的文件并将列向量附加到矩阵 X。使用gather 方法,我将由单核计算的所有 X 矩阵存储到一个单个矩阵 X 中。由 Gather 方法产生的 X 矩阵是二维 numpy 数组的列表。作为最后一步,我将列表 X 重新组织成一个矩阵

# Reading Files -----------------------------------------------------------
for i in range(len(fileList)):

    data     = pd.read_csv(folder + fileList[i])
    data     = np.array(data.values)
        
    X[:,i] = data[:,0]


X = comm.gather(X, root = 0)


if rank == 0:

    X_tot = np.empty((vectorLength, 1))
    

    for i in range(nprocs):

        X_proc  = np.array(X[i])
        X_tot   = np.append(X_tot, X_proc, axis=1)

    X_tot = X_tot[:,1:]
    X     = X_tot
    del X_tot
    print("printing X", X)

代码运行良好。我在一个小数据集上对其进行了测试,并做了它应该做的事情。但是,我尝试在大型数据集上运行它,但出现以下错误:

X = comm.gather(X[:,1:], root = 0)
  File "mpi4py/MPI/Comm.pyx", line 1578, in mpi4py.MPI.Comm.gather
  File "mpi4py/MPI/msgpickle.pxi", line 773, in mpi4py.MPI.PyMPI_gather
  File "mpi4py/MPI/msgpickle.pxi", line 778, in mpi4py.MPI.PyMPI_gather
  File "mpi4py/MPI/msgpickle.pxi", line 191, in mpi4py.MPI.pickle_allocv
  File "mpi4py/MPI/msgpickle.pxi", line 182, in mpi4py.MPI.pickle_alloc
SystemError: Negative size passed to PyBytes_FromStringAndSize

这似乎是一个非常普遍的错误,但是我可以在串行模式下处理相同的数据而不会出现问题,或者在不使用所有 n 个文件的情况下并行处理。我还注意到只有 0 级核心似乎工作,而其他核心似乎什么都不做。

这是我第一个使用 mpi4py 的项目,如果代码不完美以及我犯了任何概念性错误,我深表歉意。

【问题讨论】:

    标签: python-3.x mpi4py


    【解决方案1】:

    这个错误通常发生在 MPI 进程之间传递的数据超过一定大小(我认为是 2GB)时。它应该在未来的 MPI 版本中得到修复,但现在,您可能不得不求助于一种解决方法,例如将数据存储在硬盘上并分别在每个进程中读取它...... 例如,请参见此处:https://github.com/mpi4py/mpi4py/issues/23

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-22
      • 2017-11-26
      • 2012-12-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多