【问题标题】:Sending distributed chunks of a 2D array to the root process in MPI将二维数组的分布式块发送到 MPI 中的根进程
【发布时间】:2011-07-28 16:31:53
【问题描述】:

我有一个分布在 MPI 进程网格中的二维数组(本例中为 3 x 2 进程)。数组的值是在该数组块分配到的进程中生成的,我想在根进程中将所有这些块收集在一起以显示它们。

到目前为止,我有下面的代码。这会生成一个笛卡尔通信器,找出 MPI 进程的坐标,并根据该坐标计算出它应该获得多少数组(因为数组不必是笛卡尔网格大小的倍数)。然后,我创建了一个新的 MPI 派生数据类型,它将整个进程子数组作为一个项目发送(也就是说,每个进程的步幅、块长度和计数都不同,因为每个进程都有不同大小的数组)。但是,当我将数据与 MPI_Gather 一起收集时,我遇到了分段错误。

我认为这是因为我不应该在 MPI_Gather 调用中使用相同的数据类型进行发送和接收。数据类型适合发送数据,因为它具有正确的计数、步幅和块长度,但是当它到达另一端时,它需要一个非常不同的派生数据类型。我不确定如何计算此数据类型的参数 - 有人有什么想法吗?

另外,如果我从完全错误的角度接近这个问题,请告诉我!

#include<stdio.h>
#include<array_alloc.h>
#include<math.h>
#include<mpi.h>

int main(int argc, char ** argv)
{
    int size, rank;
    int dim_size[2];
    int periods[2];
    int A = 2;
    int B = 3;
    MPI_Comm cart_comm;
    MPI_Datatype block_type;
    int coords[2];

    float **array;
    float **whole_array;

    int n = 10;
    int rows_per_core;
    int cols_per_core;
    int i, j;

    int x_start, x_finish;
    int y_start, y_finish;

    /* Initialise MPI */
    MPI_Init(&argc, &argv);

    /* Get the rank for this process, and the number of processes */
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (rank == 0)
    {
        /* If we're the master process */
        whole_array = alloc_2d_float(n, n);

        /* Initialise whole array to silly values */
        for (i = 0; i < n; i++)
        {
            for (j = 0; j < n; j++)
            {
                whole_array[i][j] = 9999.99;
            }
        }

        for (j = 0; j < n; j ++)
        {
            for (i = 0; i < n; i++)
            {
                printf("%f ", whole_array[j][i]);
            }
            printf("\n");
        }
    }

    /* Create the cartesian communicator */
    dim_size[0] = B;
    dim_size[1] = A;
    periods[0] = 1;
    periods[1] = 1;

    MPI_Cart_create(MPI_COMM_WORLD, 2, dim_size, periods, 1, &cart_comm);

    /* Get our co-ordinates within that communicator */
    MPI_Cart_coords(cart_comm, rank, 2, coords);

    rows_per_core = ceil(n / (float) A);
    cols_per_core = ceil(n / (float) B);

    if (coords[0] == (B - 1))
    {
        /* We're at the far end of a row */
        cols_per_core = n - (cols_per_core * (B - 1));
    }
    if (coords[1] == (A - 1))
    {
        /* We're at the bottom of a col */
        rows_per_core = n - (rows_per_core * (A - 1));
    }

    printf("X: %d, Y: %d, RpC: %d, CpC: %d\n", coords[0], coords[1], rows_per_core, cols_per_core);

    MPI_Type_vector(rows_per_core, cols_per_core, cols_per_core + 1, MPI_FLOAT, &block_type);
    MPI_Type_commit(&block_type);

    array = alloc_2d_float(rows_per_core, cols_per_core);

    if (array == NULL)
    {
        printf("Problem with array allocation.\nExiting\n");
        return 1;
    }

    for (j = 0; j < rows_per_core; j++)
    {
        for (i = 0; i < cols_per_core; i++)
        {
            array[j][i] = (float) (i + 1);
        }
    }

    MPI_Barrier(MPI_COMM_WORLD);

    MPI_Gather(array, 1, block_type, whole_array, 1, block_type, 0, MPI_COMM_WORLD);

    /*
    if (rank == 0)
    {
        for (j = 0; j < n; j ++)
        {
            for (i = 0; i < n; i++)
            {
                printf("%f ", whole_array[j][i]);
            }
            printf("\n");
        }
    }
    */
    /* Close down the MPI environment */
    MPI_Finalize();
}

我上面使用的二维数组分配例程是这样实现的:

float **alloc_2d_float( int ndim1, int ndim2 ) {

  float **array2 = malloc( ndim1 * sizeof( float * ) );

  int i;

  if( array2 != NULL ){

    array2[0] = malloc( ndim1 * ndim2 * sizeof( float ) );

    if( array2[ 0 ] != NULL ) {

      for( i = 1; i < ndim1; i++ )
    array2[i] = array2[0] + i * ndim2;

    }

    else {
      free( array2 );
      array2 = NULL;
    }

  }

  return array2;

}

【问题讨论】:

  • 您的二维数组究竟是如何分配的?可以发一下alloc_2d_float的实现吗?
  • 啊,是的 - 抱歉,我正在使用朋友提供的库例程,忘记提供该代码。我已经更新了问题以包含该代码。

标签: c mpi parallel-processing


【解决方案1】:

这是一个棘手的问题。您在正确的轨道上,是的,您将需要不同的类型来发送和接收。

发送部分很简单——如果你发送整个子数组array,那么你甚至不需要向量类型;您可以从&amp;(array[0][0])(或array[0],如果您愿意)开始发送整个(rows_per_core)*(cols_per_core) 连续浮点数。

正如您所收集的那样,接收是棘手的部分。让我们从最简单的情况开始——假设所有内容均分,因此所有块的大小相同。然后你可以使用非常有用的MPI_Type_create_subarray(你总是可以将它与向量类型拼凑在一起,但是对于高维数组,这变得乏味,因为你需要为数组的每个维度创建一个中间类型,除了最后一个...... .

此外,您可以使用同样有用的MPI_Dims_create 来创建一个尽可能多的等级分解,而不是对分解进行硬编码。笔记 这不一定与 MPI_Cart_create 有任何关系,尽管您可以将它用于请求的尺寸。我将在这里跳过 cart_create 的东西,不是因为它没有用,而是因为我想专注于收集的东西。

所以如果每个人的array的大小都相同,那么root从每个人那里接收的数据类型都是相同的,可以使用非常简单的子数组类型来获取他们的数据:

MPI_Type_create_subarray(2, whole_array_size, sub_array_size, starts,
                         MPI_ORDER_C, MPI_FLOAT, &block_type);
MPI_Type_commit(&block_type);

sub_array_size[] = {rows_per_core, cols_per_core}whole_array_size[] = {n,n} 和这里的starts[]={0,0} - 例如,我们只是假设一切都开始了。 这样做的原因是我们可以使用 Gatherv 将位移显式设置到数组中:

for (int i=0; i<size; i++) {
    counts[i] = 1;   /* one block_type per rank */

    int row = (i % A);
    int col = (i / A);
    /* displacement into the whole_array */
    disps[i] = (col*cols_per_core + row*(rows_per_core)*n);
}

MPI_Gatherv(array[0], rows_per_core*cols_per_core, MPI_FLOAT,
            recvptr, counts, disps, resized_type, 0, MPI_COMM_WORLD);

所以现在每个人都以一个块的形式发送他们的数据,然后将其接收到数组右侧的类型中。为此,我调整了类型的大小,使其范围仅为一个浮点数,因此可以以该单位计算位移:

MPI_Type_create_resized(block_type, 0, 1*sizeof(float), &resized_type);
MPI_Type_commit(&resized_type);

整个代码如下:

#include<stdio.h>
#include<stdlib.h>
#include<math.h>
#include<mpi.h>

float **alloc_2d_float( int ndim1, int ndim2 ) {
    float **array2 = malloc( ndim1 * sizeof( float * ) );
    int i;

    if( array2 != NULL ){
        array2[0] = malloc( ndim1 * ndim2 * sizeof( float ) );
        if( array2[ 0 ] != NULL ) {
            for( i = 1; i < ndim1; i++ )
                array2[i] = array2[0] + i * ndim2;
        }

        else {
            free( array2 );
            array2 = NULL;
        }
    }
    return array2;
}

void free_2d_float( float **array ) {
    if (array != NULL) {
        free(array[0]);
        free(array);
    }
    return;
}

void init_array2d(float **array, int ndim1, int ndim2, float data) {
    for (int i=0; i<ndim1; i++) 
        for (int j=0; j<ndim2; j++)
            array[i][j] = data;
    return;
}

void print_array2d(float **array, int ndim1, int ndim2) {
    for (int i=0; i<ndim1; i++) {
        for (int j=0; j<ndim2; j++) {
            printf("%6.2f ", array[i][j]);
        }
        printf("\n");
    }
    return;
}


int main(int argc, char ** argv)
{
    int size, rank;
    int dim_size[2];
    int periods[2];
    MPI_Datatype block_type, resized_type;

    float **array;
    float **whole_array;
    float *recvptr;

    int *counts, *disps;

    int n = 10;
    int rows_per_core;
    int cols_per_core;
    int i, j;

    int whole_array_size[2];
    int sub_array_size[2];
    int starts[2];
    int A, B;

    /* Initialise MPI */
    MPI_Init(&argc, &argv);

    /* Get the rank for this process, and the number of processes */
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (rank == 0)
    {
        /* If we're the master process */
        whole_array = alloc_2d_float(n, n);
        recvptr = &(whole_array[0][0]);

        /* Initialise whole array to silly values */
        for (i = 0; i < n; i++)
        {
            for (j = 0; j < n; j++)
            {
                whole_array[i][j] = 9999.99;
            }
        }

        print_array2d(whole_array, n, n);
        puts("\n\n");
    }

    /* Create the cartesian communicator */
    MPI_Dims_create(size, 2, dim_size);
    A = dim_size[1];
    B = dim_size[0];
    periods[0] = 1;
    periods[1] = 1;

    rows_per_core = ceil(n / (float) A);
    cols_per_core = ceil(n / (float) B);
    if (rows_per_core*A != n) {
        if (rank == 0) fprintf(stderr,"Aborting: rows %d don't divide by %d evenly\n", n, A);
        MPI_Abort(MPI_COMM_WORLD,1);
    }
    if (cols_per_core*B != n) {
        if (rank == 0) fprintf(stderr,"Aborting: cols %d don't divide by %d evenly\n", n, B);
        MPI_Abort(MPI_COMM_WORLD,2);
    }

    array = alloc_2d_float(rows_per_core, cols_per_core);
    printf("%d, RpC: %d, CpC: %d\n", rank, rows_per_core, cols_per_core);

    whole_array_size[0] = n;             
    sub_array_size  [0] = rows_per_core; 
    whole_array_size[1] = n;
    sub_array_size  [1] = cols_per_core;
    starts[0] = 0; starts[1] = 0;

    MPI_Type_create_subarray(2, whole_array_size, sub_array_size, starts, 
                             MPI_ORDER_C, MPI_FLOAT, &block_type);
    MPI_Type_commit(&block_type);
    MPI_Type_create_resized(block_type, 0, 1*sizeof(float), &resized_type);
    MPI_Type_commit(&resized_type);

    if (array == NULL)
    {
        printf("Problem with array allocation.\nExiting\n");
        MPI_Abort(MPI_COMM_WORLD,3);
    }

    init_array2d(array,rows_per_core,cols_per_core,(float)rank);

    counts = (int *)malloc(size * sizeof(int));
    disps  = (int *)malloc(size * sizeof(int));
    /* note -- we're just using MPI_COMM_WORLD rank here to
     * determine location, not the cart_comm for now... */
    for (int i=0; i<size; i++) {
        counts[i] = 1;   /* one block_type per rank */

        int row = (i % A);
        int col = (i / A);
        /* displacement into the whole_array */
        disps[i] = (col*cols_per_core + row*(rows_per_core)*n);
    }

    MPI_Gatherv(array[0], rows_per_core*cols_per_core, MPI_FLOAT, 
                recvptr, counts, disps, resized_type, 0, MPI_COMM_WORLD);

    free_2d_float(array);
    if (rank == 0) print_array2d(whole_array, n, n);
    if (rank == 0) free_2d_float(whole_array);
    MPI_Finalize();
}

小事——在聚集之前你不需要屏障。事实上,你几乎不需要障碍,它们是昂贵的操作,出于某些原因,并且可以隐藏问题——我的经验法则是永远不要使用障碍,除非你确切知道为什么需要使用障碍在这种情况下坏了。特别是在这种情况下,集体 gather 例程与屏障执行完全相同的同步,因此只需使用它即可。

现在,转向更难的东西。如果事情平均分配,您有几个选择。最简单的,虽然不一定是最好的,只是填充数组,以便它确实均匀划分,即使只是为了这个操作。

如果你可以安排它使列数均匀分布,即使行数不均匀,那么你仍然可以使用gatherv并为行的每个部分创建一个向量类型,然后gatherv每个处理器的适当行数。那会很好用。

如果你肯定有两个都不能指望分割的情况,并且你不能填充数据进行发送,那么我可以看到三个子选项:

  • 正如 susterpatt 建议的那样,进行点对点操作。对于少量任务,这很好,但随着它变得越来越大,这将大大低于集体操作的效率。
  • 创建一个由所有不在外边缘的处理器组成的通信器,并完全使用上面的代码来收集它们的代码;然后点对点边缘任务的数据。
  • 根本不聚集处理 0;使用Distributed array type描述数组的布局,使用MPI-IO将所有数据写入文件;完成后,您可以让进程零以某种方式显示数据。

【讨论】:

    【解决方案2】:

    看起来MPI_Gather 调用的第一个参数应该是array[0],而不是array

    另外,如果您需要从每个排名中获取不同数量的数据,最好使用MPI_Gatherv

    最后,并不是说在一个地方收集所有数据以进行输出在许多情况下都是不可扩展的。随着数据量的增长,最终,它将超过排名 0 的可用内存。分配输出工作(如果您正在写入文件,使用 MPI IO 或其他库调用)或做点可能会更好。 to-point 一次向 0 级发送一个,以限制总内存消耗。

    另一方面,我建议将您的每个等级打印一个接一个地协调到标准输出,因为一些主要的 MPI 实现不保证标准输出将按顺序生成.尤其是 Cray 的 MPI,如果打印多个等级,它会非常彻底地混淆标准输出。

    【讨论】:

      【解决方案3】:

      根据this(我强调):

      集体操作的类型匹配条件比点对点的发送者和接收者之间的对应条件更严格。即对于集体操作,发送的数据量必须与接收方指定的数据量完全匹配。仍然允许发送者和接收者之间的不同类型映射。

      听起来你有两个选择:

      1. 填充较小的子矩阵,以便所有进程发送相同数量的数据,然后在 Gather 之后将矩阵裁剪回其原始大小。如果您喜欢冒险,您可以尝试定义接收类型映射,以便在 Gather 操作期间自动覆盖填充,从而消除之后对裁剪的需要。不过这可能会有点复杂。
      2. 回退到点对点通信。更直接,但沟通成本可能更高。

      就个人而言,我会选择选项 2。

      【讨论】:

        猜你喜欢
        • 2011-04-14
        • 1970-01-01
        • 2012-03-05
        • 2011-05-21
        • 2019-08-12
        • 2013-12-12
        • 1970-01-01
        • 2013-01-12
        相关资源
        最近更新 更多