【问题标题】:How to improve speed of MPI I/O on large number of cores?如何在大量内核上提高 MPI I/O 的速度?
【发布时间】:2014-06-04 11:47:29
【问题描述】:

我一直在尝试在大量内核上使用 MPI I/O 运行代码。每个核心读取和写入单个文件(所有核心都相同)所需的时间随着使用的核心数量而增加。我目前正在使用 512 个内核,这个问题使我的项目不可行。但是,即使在 8 个内核上运行,也会出现问题;然后读取文件中的第一个实数大约需要 0.2 秒。在 32 个内核上,写入一个实数需要 30 秒以上。我在这里运行它:https://www.msi.umn.edu/hpc/itasca。下面的简单代码正好产生了这个问题(这里可能不需要计算文件中元素的数量,但在我的实际代码中是必需的):

PROGRAM MAIN

USE MPI
IMPLICIT NONE

! INITIALIZING VARIABLES   

REAL(8) :: A, B
INTEGER :: COUNT_IO, i, j, ST, GO, tag, t, nb_bytes, N, d_each, d_start, d_end, NN
REAL(8) :: time_start, time_end

! VARIABLES RELATED TO MPI

INTEGER :: ierror  ! returns error messages from the mpi subroutines 
INTEGER :: rank    ! identification number of each processor
INTEGER :: nproc   ! number of processors
INTEGER, DIMENSION(mpi_status_size):: status
INTEGER(kind= MPI_OFFSET_KIND ) :: offset
INTEGER :: fh  ! file handle

! EXECUTABLE

    ! INITIALIZE THE MPI ENVIRONMENT

    CALL MPI_INIT(ierror)                           ! initialize MPI 
    CALL MPI_COMM_RANK(MPI_COMM_WORLD,rank,ierror)  ! obtain rank for each node
    CALL MPI_COMM_SIZE(MPI_COMM_WORLD,nproc,ierror) ! obtain the number of nodes
    CALL MPI_TYPE_SIZE(MPI_REAL8,nb_bytes,ierror)

    CALL MPI_FILE_OPEN (MPI_COMM_WORLD,"file.dat",MPI_MODE_RDWR+MPI_MODE_UNIQUE_OPEN,MPI_INFO_NULL,fh,ierror)    

    NN = 2048

    DO d_each=1,NN
        IF (d_each*nproc>=NN) EXIT
    END DO
    d_start = rank*d_each+1 
    d_end   = MIN((rank+1)*d_each,NN)

    DO t = d_start,d_end

        ! READING ONE THREAD AT A TIME

        tag = 1

        GO = 0

        IF (rank .gt. 0) THEN
            CALL MPI_RECV (GO,1,MPI_INTEGER,rank-1,tag, MPI_COMM_WORLD ,status,ierror)
        ENDIF

        time_start = MPI_WTIME()

        i  = 0
        ST = 0
        COUNT_IO = 0

        DO WHILE ((i .lt. 100000) .AND. (ST .eq. 0))
            i = i+1
            offset = nb_bytes*(i-1)
            CALL MPI_FILE_READ_AT (fh,offset,A,1,MPI_REAL8,status,ierror)
            IF (status(1) .eq. 0) THEN
                COUNT_IO = i
                ST = 1
            ELSE
                COUNT_IO = 0
            END IF        
        ENDDO

        N = (COUNT_IO - 1)

        IF (N .gt. 0) THEN

            offset = 0                      
            CALL MPI_FILE_READ_AT (fh,offset,B,1,MPI_REAL8,status,ierror)

        ENDIF

        time_end = MPI_WTIME()

        PRINT *, 'My rank is', rank, 'Time for read  =',time_end-time_start 

        GO = 1    
        IF (rank .lt. nproc-1) THEN
            CALL MPI_SEND (GO,1, MPI_INTEGER ,rank+1,tag, MPI_COMM_WORLD ,ierror)
        ENDIF

        CALL MPI_BARRIER(MPI_COMM_WORLD,ierror)

        ! WRITING ONE THREAD AT A TIME

        tag = 2

        GO = 0

        IF (rank .gt. 0) THEN
            CALL MPI_RECV (GO,1,MPI_INTEGER,rank-1,tag, MPI_COMM_WORLD ,status,ierror)
        ENDIF

        time_start = MPI_WTIME()

        i  = 0
        ST = 0
        COUNT_IO = 0

        DO WHILE ((i .lt. 100000) .AND. (ST .eq. 0))
            i = i+1
            offset = nb_bytes*(i-1)
            CALL MPI_FILE_READ_AT (fh,offset,A,1,MPI_REAL8,status,ierror)
            IF (status(1) .eq. 0) THEN
                COUNT_IO = i
                ST = 1
            ELSE
                COUNT_IO = 0
            END IF        
        ENDDO

        N = (COUNT_IO - 1)

        offset = nb_bytes*N
        CALL MPI_FILE_WRITE_AT (fh,offset,0.0D0,1,MPI_REAL8,status,ierror) 

        time_end = MPI_WTIME()  

        PRINT *, 'My rank is', rank, 'Time for write =',time_end-time_start

        GO = 1    
        IF (rank .lt. nproc-1) THEN
            CALL MPI_SEND (GO,1, MPI_INTEGER ,rank+1,tag, MPI_COMM_WORLD ,ierror)
        ENDIF

        CALL MPI_BARRIER(MPI_COMM_WORLD,ierror)

    ENDDO

    CALL MPI_FILE_CLOSE (fh,ierror)

    CALL MPI_FINALIZE(ierror)

END PROGRAM MAIN

【问题讨论】:

  • 哇。如果您有 32 个进程在处理约 128MB 的文件,我计算出大约 640 亿次 fseek 和 640 亿次 8 字节读/写,每个都是需要实时的 I/O 操作,忽略两个障碍和 2048 次发送/接收对。所有读取都是顺序的,但是在同一个文件上并行执行多个进程的查找几乎可以保证您的文件系统/磁盘缓存无法利用这一点。我想即使在 2 个过程中,这也会停止。能否请您用文字概括一下您要做什么?
  • 从系统描述看来,主要文件系统是NFS,一般性能极差。即使您使用 Lustre 文件系统,它也不能很好地处理小文件(128 MB 是一个小文件!)。总的来说,正如 Jonathan Dursi 已经指出的那样,使用 MPI I/O 一次读取一个浮点数会导致池性能极佳。
  • 首先感谢两位的快速回复。我绝对不是 mpi-io 方面的专家,我真的可以使用一些建议。这是我试图用文字做的事情:我试图最大化一个需要大约 10 分钟来评估的函数,而我的优化例程需要大量的评估。为了加快评估速度,我构建了一个初始条件库,即单个 dat 文件。我希望每个核心都能够读取(在评估之前)和写入(之后)这个文件。如果您对如何有效地做到这一点有任何建议,我将不胜感激。
  • @user3553449 ,您在这里尝试设置的 I/O 模式是什么?据我所知,您让每个进程一次重复读取 8 个字节的相同数据,然后有一个障碍并在最后写入一次并再次执行相同的操作。如果您可以描述您尝试实现的 I/O 模式,我们可以建议替代方法。您将希望一次读取尽可能多的数据,在内存中处理它(而不是每次都进入文件系统),并在进程之间进行协调。
  • @JonathanDursi,在每次评估之前,我需要每个核心知道整个文件的内容(到目前为止,我每个核心都在读取它),一旦评估完成,我需要将结果保存到文件(将在下一次评估中使用)。我在读取之前使用计数器来设置用于决定哪个是最佳初始条件的向量的大小,并在写入之前使用以设置偏移量以附加文件。我正在按顺序进行写入和读取,以防止 2 个内核同时访问文件。再次感谢您的帮助!

标签: fortran90 hpc mpi-io


【解决方案1】:

这里要意识到的主要事情是,您可以一口气读入数据(或者,如果内存有问题,可以分块读取数据 - 但它可以是比单个双打大得多的块!) ' 不需要一次两次跳到文件末尾。

这是一个示例,它将以任意块大小读取数据,按照您的意愿处理数据,并附加一些数据(在这种情况下,每个人只需将 4 个他们的排名副本添加到文件末尾)。为简单起见,小 Python 脚本有助于编写和显示测试数据。

$ ./writedata.py 
$ ./readdata.py 
[  0.   1.   2.   3.   4.   5.   6.   7.   8.   9.  10.  11.  12.  13.  14.
  15.  16.  17.  18.  19.  20.  21.  22.  23.  24.]

$ mpirun -np 3 ./usepario
 rank:   0 got data: 0.000...   24.000 
 rank:   1 got data: 0.000...   24.000
 rank:   2 got data: 0.000...   24.000

$ ./readdata.py 
[  0.   1.   2.   3.   4.   5.   6.   7.   8.   9.  10.  11.  12.  13.  14.
  15.  16.  17.  18.  19.  20.  21.  22.  23.  24.   0.   0.   0.   0.   1.
   1.   1.   1.   2.   2.   2.   2.]

使用pario.f90:

module pario

contains
    function openFile(filename)
        use mpi
        implicit none
        integer :: openFile, ierr
        character(len=*) :: filename
        integer(MPI_OFFSET_KIND) :: off = 0

        call MPI_File_open(MPI_COMM_WORLD, filename,  &
                           ior(MPI_MODE_RDWR, MPI_MODE_UNIQUE_OPEN),  &
                           MPI_INFO_NULL, openFile, ierr)
        call MPI_File_set_view(openFile, off,  &
                               MPI_DOUBLE_PRECISION, MPI_DOUBLE_PRECISION, &
                               "native", MPI_INFO_NULL, ierr)
    end function  openFile

    subroutine closeFile(fh)
        use mpi
        implicit none
        integer :: fh, ierr
        call MPI_File_close(fh, ierr)
    end subroutine closeFile

    function filesizedoubles(fh)
        use mpi
        implicit none
        integer :: fh, ierr
        integer(MPI_OFFSET_KIND) :: filesize, filesizedoubles
        integer :: dblsize

        call MPI_File_get_size(fh, filesize, ierr)
        call MPI_type_size(MPI_DOUBLE_PRECISION, dblsize, ierr)
        filesizedoubles = filesize / dblsize
    end function filesizedoubles

    subroutine getdatablock(fh, blocksize, datablock, datasize)
        use mpi
        implicit none
        integer :: fh, ierr
        integer :: blocksize, datasize
        double precision, dimension(:) :: datablock
        integer(MPI_OFFSET_KIND) :: fileloc
        integer, dimension(MPI_STATUS_SIZE) :: rstatus

        ! you can also experiment with read_all for non collective/synchronous file
        ! access

        call MPI_File_read(fh, datablock, blocksize, MPI_DOUBLE_PRECISION, &
                           rstatus, ierr)
        call MPI_Get_count(rstatus, MPI_DOUBLE_PRECISION, datasize, ierr)
    end subroutine getdatablock

    subroutine eachappend(fh, filesize, numitems, newdata)
        use mpi
        implicit none
        integer :: fh, numitems
        integer(MPI_OFFSET_KIND) :: filesize
        double precision, dimension(:) :: newdata
        integer :: rank, ierr
        integer(MPI_OFFSET_KIND) :: offset

        call MPI_Comm_rank(MPI_COMM_WORLD, rank, ierr)
        offset = filesize + rank*numitems
        call MPI_File_write_at_all(fh, offset, newdata, numitems, &
                                    MPI_DOUBLE_PRECISION,         &
                                    MPI_STATUS_IGNORE, ierr)

    end subroutine eachappend
end module pario


program usepario
    use mpi
    use pario
    implicit none

    integer :: fileh
    integer, parameter :: bufsize=1000, newsize=4
    integer(MPI_OFFSET_KIND) :: filesize
    double precision, allocatable, dimension(:) :: curdata, newdata
    integer :: datasize
    integer :: rank, ierr

    call MPI_Init(ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD, rank, ierr)

    allocate(curdata(bufsize))

    fileh = openFile("data.dat")
    filesize = filesizedoubles(fileh)

    do
        call getdatablock(fileh, bufsize, curdata, datasize)
        !! 
        !! process data here
        !!
        !! do i=1,datasize
        !!  ...dostuff...
        !! end do
        !! 
        print '(1X,A,I3,A,F8.3,A,F8.3)', 'rank: ', rank, ' got data: ', curdata(1), '...', curdata(datasize)
        if (datasize /= bufsize) exit
    end do

    deallocate(curdata)

    allocate(newdata(newsize))
    newdata = rank

    call eachappend(fileh, filesize, newsize, newdata)
    call closeFile(fileh)

    call MPI_Finalize(ierr)
end program usepario

writedata.py:

#!/usr/bin/env python

import numpy

numdoubles = 25

data = numpy.arange(numdoubles,dtype=numpy.float64)
data.tofile("data.dat")

readdata.py:

#!/usr/bin/env python

import numpy

data = numpy.fromfile("data.dat",dtype=numpy.float64)
print data

【讨论】:

  • 非常感谢。你花时间回答我的问题给我留下了深刻的印象。你的回答充满了很好的提示。我已经将其应用到我的代码中,现在它工作得更好了。
猜你喜欢
  • 1970-01-01
  • 2023-03-20
  • 1970-01-01
  • 1970-01-01
  • 2016-03-31
  • 1970-01-01
  • 2021-06-15
  • 1970-01-01
  • 2016-05-22
相关资源
最近更新 更多