【问题标题】:How to gather arrays of unequal length with mpi4py如何使用 mpi4py 收集长度不等的数组
【发布时间】:2016-10-26 06:35:24
【问题描述】:

期望的行为:

我正在尝试在不同节点上获取许多不同长度的列表,将它们收集到一个节点中,然后让该主节点将它们放在一组中。这个列表在每个节点中被命名为rout_array。请注意,rout_array 中的元素只是整数,并且在节点间不唯一。

错误:

Traceback (most recent call last):
  File "prout.py", line 160, in <module>
    main()
  File "prout.py", line 153, in main
    num = DetermineRoutingNumber(steps, goal, vertexSetSize)
  File "prout.py", line 129, in DetermineRoutingNumber
    comm.Gather(send_buffer, recv_buffer, root = 0)

  File "MPI\Comm.pyx", line 589, in mpi4py.MPI.Comm.Gather (c:\projects\mpi4py\src\mpi4py.MPI.c:97806)
  File "MPI\msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (c:\projects\mpi4py\src\mpi4py.MPI.c:34678)
  File "MPI\msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (c:\projects\mpi4py\src\mpi4py.MPI.c:33938)
  File "MPI\msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (c:\projects\mpi4py\src\mpi4py.MPI.c:30349)
  File "MPI\msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (c:\projects\mpi4py\src\mpi4py.MPI.c:29448)

  KeyError: 'O'

当我的代码中没有字符串时,我不知道如何获得 'O' 的 KeyError。所有列表都包含整数,numpy 数组包含整数,这里唯一活动的字典只有整数作为键。需要注意的是,每个节点都会输出这个错误。

代码:

import numpy, math
from mpi4py import MPI
from sympy.combinatorics import Permutation as Perm     

def GetEdges(size,file):
    """This function takes in a file of edges in a graph in the form 'u,v'
    without quotes, where u and v are vertices of the graph. It then
    generates a permutation that swaps those vertices, and returns these
    transpositions."""

    edgeFile = open(file, "r")
    edges = []
    for line in edgeFile:
        line = line.strip()
        line = line.split(",")
        for vertex in line:
            line[line.index(vertex)] = int(vertex)
        edges.append(Perm([line], size = size))

    edgeFile.close()
    edges.append(Perm([[size - 1]], size = size))

    return edges


def AreDisjoint(p1,p2):
    """This function determines whether or not two permutations move any
    common elements, and returns the appropriate boolean."""
    v1 = set(p1.support())
    v2 = set(p2.support())

    return len(v1 & v2) == 0


def GetMatchings(edges, maxMatching, size):
    """This function takes in a set of edges given by GetEdges(), and 
    generates all possible matchings in the given graph. It then converts
    each matching into its rank given by lexicographical order, and appends
    that rank to a set, which is then returned."""

    stepDict = {1:set(edges)}
    steps = set(edges)
    for i in range(1,maxMatching):
        temp = set()
        for p1 in stepDict[1]:
            for p2 in stepDict[i]:
                newPerm = p1 * p2
                if AreDisjoint(p1,p2) and newPerm not in steps:
                    temp.add(newPerm)
                    steps.add(newPerm)

        stepDict[i+1] = temp

    newSteps = set()
    for step in steps:
        newSteps.add(step.rank())
    return newSteps


def FromRank(rank,level):
    """This function takes in a rank and size of a permutation, then returns
    the permutation that lies at the rank according to lexicographical 
    ordering. """

    lst = list(range(level + 1))
    perm = []
    while lst:
        fact = math.factorial(len(lst) - 1)
        index, rank = divmod(rank, fact)
        perm.append(lst.pop(index))
    assert rank == 0 
    return perm


def SplitArrayBetweenNodes(rank, rem, length):
    """This function takes in the rank of a node and any remainder after
    dividing up an array between all the nodes. It then returns a starting
    and ending partition index unique to each node."""
    if rem != 0:
        if rank in list(range(rem)):
            if rank == 0:
                part_start = 0
                part_end = length
            else:
                part_start = rank * (length + 1)
                part_end = part_start + length
        else:
            part_start = rank * length + rem
            part_end = part_start + length - 1
    else:
        part_start = rank * length
        part_end = part_start + length - 1

    return part_start, part_end


def DetermineRoutingNumber(steps, goal, vertexSetSize):
    """This function takes in the matchings created by GetMatchings(), 
    and calculates all possible products between its own elements. It then
    takes all unique products, and calculates all possible prducts between
    the matching set and the previous output. This repeats until all 
    permutations of a given type are found. The level at which this occurs
    is then returned."""

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    length = len(steps)
    rem = length % size
    part_len = length // size

    part_start, part_end = SplitArrayBetweenNodes(rank,rem, part_len)

    permDict = {1: steps}
    i = 1
    while True:
        rout_array = set()
        work_array = set(list(permDict[i])[part_start:part_end + 1])

        #Calculate all possible products    
        for p1 in permDict[1]:
            for p2 in work_array:
                p2_perm = Perm(FromRank(p2,vertexSetSize - 1))
                p1_perm = Perm(FromRank(p1,vertexSetSize - 1))
                new = p2_perm * p1_perm

                if new(0) == 0 or new(0) == 1:
                    order = new.rank()
                    rout_array.add(order)

        #All nodes send their work to master node
        comm.Barrier()

        send_buffer = numpy.array(rout_array)
        sendcounts = numpy.array(comm.gather(len(rout_array), root = 0))

        if rank == 0:
            recv_buffer = numpy.empty(sum(sendcounts), dtype = int)
        else:
            recv_buffer = None

        comm.Gatherv(sendbuf = send_buffer, recvbuf = (recv_buffer, sendcounts), root = 0) 

        #Generate input for next level of the loop, and weed out repeats.
        permDict[i+1] = rout_array
        for j in range(1,i+1):
            permDict[i+1] = permDict[i+1] - permDict[j]


def main():
    file = "EdgesQ2.txt"
    maxMatching = 2
    vertexSetSize = 4

    edges = GetEdges(vertexSetSize, file)
    steps = GetMatchings(edges, maxMatching, vertexSetSize)
    goal = 2 * math.factorial(vertexSetSize-1)

    num = DetermineRoutingNumber(steps, goal, vertexSetSize)
    print(num)


main()

测试用例:

EdgesQ2.txt:

请注意,此示例中的 maxMatching = 2vertexSetSize = 4。输出应该是3

0,1
1,2
2,3
0,3

EdgesQ3.txt:

请注意此示例中的 maxMatching = 4vertexSetSize = 8。输出应该是4

0,1
0,3
0,4
1,2
1,5
2,3
2,6
3,7
4,5
4,7
5,6
6,7

【问题讨论】:

  • 欢迎来到 SO。我冒昧地回答了您的根本问题,而不是KeyError。请记住,通常需要发布minimal reproducible example 以获得调试帮助。您的代码很接近,但缺少 rout_arraypermDictpart_len 以便能够重现该问题。

标签: python mpi mpi4py


【解决方案1】:

如果不同进程的长度不同,则需要使用向量变体Gatherv。使用该函数,您可以提供一个包含各种长度(recvcounts)的数组。

不幸的是,mpi4py 文档目前没有描述如何使用Gatherv 或任何其他向量变体。这是一个简单的例子:

#!/usr/bin/env python3

import numpy as np
from mpi4py import MPI
import random

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
root = 0

local_array = [rank] * random.randint(2, 5)
print("rank: {}, local_array: {}".format(rank, local_array))

sendbuf = np.array(local_array)

# Collect local array sizes using the high-level mpi4py gather
sendcounts = np.array(comm.gather(len(sendbuf), root))

if rank == root:
    print("sendcounts: {}, total: {}".format(sendcounts, sum(sendcounts)))
    recvbuf = np.empty(sum(sendcounts), dtype=int)
else:
    recvbuf = None

comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=root)
if rank == root:
    print("Gathered array: {}".format(recvbuf))

如您所见,mpi4py 不将 sendcounts 或 recvcounts 作为额外参数,而是作为 recvbuf 参数的元组/列表。如果您传递(recvbuf, sendcounts),它将从recvbuf 派生类型。将进行位移/偏移,以便所有等级的数据连续存储并按等级排序。

基本上,mpi4py 会疯狂地猜测您使用各种形式的 recvbuf 参数可能意味着什么。完整且明确的形式是(buffer, counts, displacements, type)

编辑关于KeyError:

相当容易混淆的rout_arrayset,它不是numpy.array 的有效输入。 set 既不是序列也没有数组接口。不幸的是,numpy.array 没有失败,而是创建了一个非常奇怪的 ndarray 没有尺寸的对象。您可以将数组创建包装在一个列表中:

send_buffer = numpy.array(list(rout_array))

集体工作,但循环不会终止,考虑到DetermineRoutingNumberwhile true 循环中没有returnbreak,这并不奇怪。

【讨论】:

  • 我继续在我的代码中实现了.Gatherv(),起初得到了指向len(sendbuf)的错误len() of unsized object。不幸的是,在将其更改为len(rout_array) 后,我现在得到了KeyError: 'O'
  • 好的,我尽可能地精简代码以仍然给出一个工作示例,并添加了两个测试用例。如果您还需要什么,请告诉我。
猜你喜欢
  • 1970-01-01
  • 2016-06-30
  • 2021-05-22
  • 2021-05-20
  • 2020-02-20
  • 2023-03-27
  • 2017-11-25
  • 2023-03-23
  • 1970-01-01
相关资源
最近更新 更多