【问题标题】:Checking all rank is true without using mpi4py gather and scatter在不使用 mpi4py 收集和分散的情况下检查所有排名是否正确
【发布时间】:2021-06-14 14:56:11
【问题描述】:

我正在尝试在进程之间进行通信,以便在所有其他进程都准备好时通知每个进程。下面的代码 sn-p 就是这样做的。有没有更优雅的方法来做到这一点?

def get_all_ready_status(ready_batch):
    all_ready= all(ready_batch)
    return [all_ready for _ in ready_batch]

ready_batch= comm.gather(ready_agent, root=0)
if rank == 0:
    all_ready_batch = get_all_ready_status(ready_batch)
all_ready_flag = comm.scatter(all_ready_batch , root=0)                

【问题讨论】:

    标签: python performance parallel-processing mpi mpi4py


    【解决方案1】:

    如果所有进程都需要知道哪些其他进程已准备好,那么您可以使用comm.Allgather 例程:

    from mpi4py import MPI
    import numpy
    
    
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()
    
    sendBuffer = numpy.ones(1, dtype=bool)
    recvBuffer = numpy.zeros(size, dtype=bool)
    
    print("Before Allgather => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
    comm.Allgather([sendBuffer,  MPI.BOOL],[recvBuffer, MPI.BOOL])
    print("After Allgather  => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
    

    输出:

    Before Allgather => Process 0 | sendBuffer [ True] | recvBuffer [False False]
    Before Allgather => Process 1 | sendBuffer [ True] | recvBuffer [False False]
    After Allgather  => Process 0 | sendBuffer [ True] | recvBuffer [ True  True]
    After Allgather  => Process 1 | sendBuffer [ True] | recvBuffer [ True  True]
    

    正如@Gilles Gouaillardet 在 cmets 中指出的那样:

    如果所有进程只需要知道是否所有进程都准备好了,那么 MPI_Allreduce() 更合适。

    这个想法是理论上Allreduce 应该比Allgather 更快 因为前者可以使用树形通信模式,并且因为它需要分配和通信更少的内存。更多信息可以找到here

    在您的情况下,您使用 MPI.LAND(即逻辑与)作为 Allreduce 操作运算符。

    一个例子:

    from mpi4py import MPI
    import numpy
    
    
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()
    
    sendBuffer =  numpy.ones(1, dtype=bool) if rank % 2 ==  0 else numpy.zeros(1, dtype=bool)
    recvBuffer = numpy.zeros(1, dtype=bool)
    
    print("Before Allreduce => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
    comm.Allreduce([sendBuffer,  MPI.BOOL],[recvBuffer, MPI.BOOL], MPI.LAND)
    print("After Allreduce  => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
    
    comm.Barrier()
    if rank == 0:
       print("Second RUN")
    comm.Barrier()
    
    sendBuffer =  numpy.ones(1, dtype=bool)
    recvBuffer = numpy.zeros(1, dtype=bool)
    
    print("Before Allreduce => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
    comm.Allreduce([sendBuffer,  MPI.BOOL],[recvBuffer, MPI.BOOL], MPI.LAND)
    print("After Allreduce  => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
    

    输出:

    Before Allreduce => Process 1 | sendBuffer [False] | recvBuffer [False]
    Before Allreduce => Process 0 | sendBuffer [ True] | recvBuffer [False]
    After Allreduce  => Process 1 | sendBuffer [False] | recvBuffer [False]
    After Allreduce  => Process 0 | sendBuffer [ True] | recvBuffer [False]
    Second RUN
    Before Allreduce => Process 0 | sendBuffer [ True] | recvBuffer [False]
    Before Allreduce => Process 1 | sendBuffer [ True] | recvBuffer [False]
    After Allreduce  => Process 0 | sendBuffer [ True] | recvBuffer [ True]
    After Allreduce  => Process 1 | sendBuffer [ True] | recvBuffer [ True]
    

    在输出的第一部分(即,“第二次运行”之前),结果是FALSE,因为具有偶数排名的进程尚未准备好(即,False)和准备好的具有奇数等级的进程。因此,False & True => False。在第二部分,结果是True,因为所有进程都准备好了。

    【讨论】:

      猜你喜欢
      • 2022-10-14
      • 2016-06-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-10-26
      相关资源
      最近更新 更多