【问题标题】:MPI_Send/Recv dynamically allocated arrays in CMPI_Send/Recv C 中动态分配的数组
【发布时间】:2016-01-04 23:57:47
【问题描述】:

我正在尝试使用 MPI 在 C 中编写有限体积求解器,但似乎无法使用 MPI_Send 和 MPI_Recv 正确传递数组。我需要所有工作人员对他们的数组部分进行一些计算,然后将该子数组发送回主数组以将子数组重新组合在一起并将近似值与已知解决方案进行比较。 fvm 求解器和代码的结构是正确的,我已经根据已知解决方案检查了串行代码。下面是我尝试将子数组传回主控并在主控中接收它们的代码。我已经为 Valgrind 配置了 mpi 支持,而 memcheck 工具不喜欢 send_output_MPI 函数中的分配。这与我尝试运行程序时发生的情况一致。 Mpiexec 使用信号代码中止:6。

以下是完整的示例代码。数组的传递是在 recv_output_MPI 和 send_output_MPI 函数中,我尝试将子数组传回给 master。

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <mpi.h>
#include <math.h>
#include <time.h>
#include <string.h>
#include "fvm.h"
#include "lab_mpi.h"

int main( int argc, char *argv[] )
{
    double tt0, tt1;

//  MPI variables
    int ierr; // MPI error flag
    int nProc, nWrs, myID, rc;

//-------------Start MPI------------------
    rc = MPI_Init( &argc, &argv );
    if ( rc != MPI_SUCCESS ) {
        printf( "Error starting MPI program. Terminating.\n" );
        MPI_Abort(MPI_COMM_WORLD, rc);
     }
    ierr = MPI_Comm_size( MPI_COMM_WORLD, &nProc );
    assert( !ierr );
    ierr = MPI_Comm_rank( MPI_COMM_WORLD, &myID );
    assert( !ierr );
    nWrs = nProc - 1;
    assert( nWrs == nProc - 1 );

    if ( myID == MASTER ) {
        tt0 = MPI_Wtime( );
        master( nWrs, myID );
        tt1 = MPI_Wtime();
        printf("  Main MR timing = %lf sec on %i workers.\n", tt1 - tt0, nWrs );
        fflush(stdout);
    } else {
        ierr = worker( nWrs, myID );
        printf("  Worker %i exiting; ierr = %i\n", myID, ierr );
        fflush(stdout);
        if ( ierr != 0 )
            printf("  Worker %i exiting with ierr = %i\n", myID, ierr );
        fflush(stdout);
    }

//---------------End MPI-------------------------
    ierr = MPI_Finalize( );
    assert( !ierr );
    exit( EXIT_SUCCESS );
}

/* master MPI function
 * reads input file, packs ints and doubles into arrays, then
 * broadcasts
 * receives output from workers at t = tout
 */
int master( int nWrs, int master )
{
    unsigned int i = 0;
    int Mx; // M/nWrks gives domain decomposition
    double t = 0.0;
    char buffer[50];
    int M, MM;
    unsigned int N_max;
    double t0, t_end, dt_out, a, b, factor, D, dx, dt, t_out, dt_expl, mu;
    double* x_vals;
    double* U;

    //----MPI variables-----
    int numInts = 2;
    int numDbls = 6;
    int ierr, nProc, myID, rc;
    int *intParams;
    double *dblParams;

    fgets(buffer, sizeof(buffer), stdin);
    fscanf(stdin, "%lf %lf %lf %lf %lf %lf %lf %i",
           &t0, &t_end, &dt_out, &a, &b, &factor, &D, &MM);
    printf("t0: %lf t end: %lf  Number of CVs: %i  Factor: %3.2lf\n",
           t0, t_end, MM, factor);
        fflush(stdout);

    M = (b - a) * MM;
    dx = (double)(b - a)/M;
    dt_expl = (dx * dx)/(2 * D);
    dt = factor * dt_expl;
    N_max = (unsigned int)((t_end - t0)/dt + 1);
    t_out = max(dt_out, dt);
    mu = dt/dx;

    x_vals = ( double* ) calloc((M + 2), sizeof(double));
    U = ( double* ) calloc((M + 2), sizeof(double));
    xMesh( a, b, M, x_vals );

    intParams = ( int* ) malloc(( numInts ) * sizeof( int ));
    dblParams = ( double* ) malloc(( numDbls ) * sizeof( double ));

   // Pack arrays of variables to broadcast
    intParams[0] = N_max;
    intParams[1] = M;

    dblParams[0] = D;
    dblParams[1] = t_out;
    dblParams[2] = dt_out;
    dblParams[3] = dt;
    dblParams[4] = a;
    dblParams[5] = b;

    ierr = MPI_Bcast( intParams, numInts, MPI_INT, MASTER, MPI_COMM_WORLD );
    assert( !ierr );
    ierr = MPI_Bcast( dblParams, numDbls, MPI_DOUBLE, MASTER, MPI_COMM_WORLD );
    assert( !ierr );

    // begin timestepping
    for (i = 1; i <= N_max; i++) {
//         sent to workers
//         flux( F, U, M, dx, D, t);
//         pde( F, U, M, D, mu, t, b );
        ierr = MPI_Barrier( MPI_COMM_WORLD );
        assert( !ierr );
        t = i * dt;
        if ( t >= t_out ) {
            recv_output_MPI( nWrs, M, U ); 
            printf( "\nProfile at time: %lf, N-step: %u\n", t, i );
            fflush(stdout);
            compare( U, x_vals, M, D, t );
            t_out += dt_out;
        }
    }

    printf( "\nDone at time: %.6lf and Nsteps: %u\n\n", t, i );
    free( intParams );
    free( dblParams );
    free( U );
    free( x_vals );
    ierr = MPI_Barrier( MPI_COMM_WORLD );
    return ierr;
}
/* Tasks of WORKER:
1. Unpack initial iparms and parms arrays, local Mz = Mz / nWRs
2. Exchange "boundary" values with neighbors
3. Do timestepping computation
4. Send output to MR every dtout
*/
int worker( int nWrs, int Me )
{
    double t = 0.0;
    unsigned int i;
    int ierr;
    int nodeLHS, nodeRHS;
    int numInts = 2;
    int numDbls = 6;
    int* intParams;
    double* dblParams;

    int N_max, M;
    double D, tout, dt_out, dt, a, b, mu, dx;
    double* U;
    double* F;
    double* x_vals;

    intParams = ( int* ) malloc(( numInts ) * sizeof( int ));
    dblParams = ( double* ) malloc(( numDbls ) * sizeof( double ));

    ierr = MPI_Bcast( intParams, numInts, MPI_INT, MASTER, MPI_COMM_WORLD );
    assert( !ierr );
    ierr = MPI_Bcast( dblParams, numDbls, MPI_DOUBLE, MASTER, MPI_COMM_WORLD );
    assert( !ierr );

    N_max = intParams[0];
    M = intParams[1];

    D = dblParams[0];
    tout = dblParams[1];
    dt_out = dblParams[2];
    dt = dblParams[3];
    a = dblParams[4];
    b = dblParams[5];
    mu = (M * dt)/(b - a);
    dx = (double)(b - a)/M;

    x_vals = calloc((M + 2), sizeof(double));
    U = calloc((M + 2), sizeof(double));
    F = calloc((M + 2), sizeof(double));

    xMesh( a, b, M, x_vals );
    init( M, U ); //set u(a) = 1, u(b) = 0

    for ( i = 1; i <=N_max; i++ ) {
        ierr = MPI_Barrier( MPI_COMM_WORLD ); 
        assert( !ierr );
        t = i * dt;
//          flux( nWrs, Me, F, U, M, dx, D, t );
//          pde( nWrs, Me, F, U, M, D, mu, t, b );
        if (t >= tout) {
            send_output_MPI( nWrs, Me, M, U );
            tout += dt_out;
        }
    }

    free ( intParams );
    free ( dblParams );
    deleteMem ( U, F, x_vals );
    fflush(stdout);
    ierr = MPI_Barrier( MPI_COMM_WORLD );
    return ierr;
}
// only done by master
void recv_output_MPI( int nWrs, int M, double* U )
{
    int Ime;
    unsigned int i, source;
    unsigned int msgtag = 1000;
    int ierr, offset;
    int chunkSize = (M + 2)/nWrs;
    unsigned int end;
    double* tmp;

    MPI_Status status;
    MPI_Datatype Mytype;

    ierr = MPI_Type_contiguous( chunkSize, MPI_DOUBLE, &Mytype );
    assert ( !ierr );
    ierr = MPI_Type_commit( &Mytype );
    assert ( !ierr );

    for ( i = 1; i <= nWrs; i++ ) {
        source = i;
        msgtag = i * msgtag;
        offset = (i - 1) * chunkSize;
        end = i * chunkSize;
        tmp = ( double* ) malloc(( chunkSize ) * sizeof( double ));
        ierr = MPI_Recv( &offset, 1, MPI_INT, source, msgtag, MPI_COMM_WORLD, &status ); 
        ierr = MPI_Recv( tmp, chunkSize, MPI_DOUBLE, source, msgtag+1, MPI_COMM_WORLD, &status );
        assert ( !ierr );
        for ( i = offset; i < end; i++ ){
            U[i] = tmp[i];
        }
    }
    ierr = MPI_Type_free( &Mytype );
    assert ( !ierr );
    free( tmp );
    return;
}
void send_output_MPI( int nWrs, int Me, int M, double* U )//TODO:fix
{
    int ierr, msgtag, i;
    int start = (Me - 1) * (M/nWrs)+1;
    int chunkSize = (M + 2)/nWrs;
    unsigned int offset = ( Me - 1 ) * chunkSize;
    unsigned int end = Me * chunkSize;
    double* sendVals  = calloc( chunkSize, sizeof( double ));
    assert( sendVals != NULL );
    MPI_Datatype Mytype;

    ierr = MPI_Type_contiguous( chunkSize, MPI_DOUBLE, &Mytype );
    assert ( !ierr );
    ierr = MPI_Type_commit( &Mytype );
    assert ( !ierr );

    msgtag = Me * 1000;
    ierr = MPI_Send( &offset, 1, MPI_INT, MASTER, msgtag, MPI_COMM_WORLD ); 

//     memcpy( &sendVals[0], &U[offset], chunkSize * sizeof( double ));
    // send part of the U array
    ierr = MPI_Send( sendVals, chunkSize, Mytype, MASTER, msgtag+1, MPI_COMM_WORLD );
    assert( !ierr );
    ierr = MPI_Type_free( &Mytype );
    assert ( !ierr );
    free( sendVals );
    return;
}
/*  produces nodal values for x-array
    takes interval endpoints [a,b] as double and
    M as number of nodal values
    returns pointer to x-array
    precondition: x-array allocated in main,
    postcondition: x-array populated in main
*/
void xMesh( double a, double b, int M, double* x )
{
    unsigned int i;
    double dx = (b - a)/M;
    x[0] = a;
    for (i = 1; i < M+1; i++)
        x[i] = a + (i - 0.5)*dx;
    x[M + 1] = b;
    return;
}
/* function: init
 * takes: endpoints, a and b, x-array and pde struct
 * returns: nothing
 * precondition: U and x array initialized, x-array defined
 * postcondition: u_init set in U array
 */
void init( int M, double* U )
{
    U[0] = 1.0;
    U[M+1] = 0.0;
}

void deleteMem( double* F, double* U, double* x )
{
    free( F );
    free( U );
    free( x );
    return;
}
/* function: compare - compares numerical approximation against the
 *           closed-form solution
 * params: U - pointer to u array
           x - pointer to x (spatial) array
           M - array size
           D - diffusion coefficient
           t - time
   precondition: arrays initialized and containing values
   postcondition: file 'plot.out' appended with values for plotting
*/
void compare( double* U, double* x, int M, double D, double t )
{
    unsigned int j;
    double error, u_exact;
    double err_max = 0.0;
    double val = 2.0 * sqrt( D * t );

    for ( j = 0; j < M+2; j++ ) {
        u_exact = erfc( x[j]/val );
        error = fabs( u_exact - U[j] );
        fprintf( stdout, "%10.8lf\t%18.16lf\t%18.16lf\n", x[j], U[j], u_exact );
        fflush(stdout);
        err_max = max( error, err_max );
    }
    return;
}

读入的文本文件类似于:

t0 tEnd dtout a b factor D MM

0.0 1.0 1.0 0.0 1.0 0.9 0.1 8

我尝试使用 memcpy 和 for 循环将传递的数组复制回 U 数组,但今天早上没有任何效果。

提前致谢。

【问题讨论】:

标签: c mpi


【解决方案1】:

我看到的一些东西:

首先,我希望您只调用几次。如果不是,那么您需要重构您的逻辑,以便您只在算法的初始化阶段创建一个 MPI_Datatype。由于显而易见的原因,每次要发送信息时创建数据类型都很慢。

[如果数据类型的大小是唯一变化的,那么更好的选择是简单地停止使用 MPI_Datatypes。无论如何,您似乎只是将它们用于连续数据,因此您可以在每次发送消息时正确指定 MPI_Send/MPI_Recv 中的计数。这是我在下面的重写中采用的方法]

其次,您发送的信息量似乎是错误的。也就是说,您将sendVals 设置为大小为chunkSize。因此,当您发送U 数组的一部分时,您应该只发送chunkSize 双精度数。

ierr = MPI_Send(sendVals, chunkSize, MPI_DOUBLE, MASTER, msgtag+1, MPI_COMM_WORLD);

有效,或者使用您的 MPI_Datatype 的替代方法是

ierr = MPI_Send(sendVals, 1, Mytype, MASTER, msgtag+1, MPI_COMM_WORLD);

同样,在接收端,您应该只接收您的数据类型之一或chunkSizeMPI_DOUBLEs。

就个人而言,我会写下你的内容:

void send_output_MPI( int nWrs, int Me, int M, double* U )
{
    int ierr, msgtag, i;
    int start = (Me - 1) * (M/nWrs)+1;
    int chunkSize = (M + 2)/nWrs;
    unsigned int offset = ( Me - 1 ) * chunkSize;

    msgtag = Me * 1000;
    // send part of the U array
    ierr = MPI_Send( &U[offset], chunkSize, MPI_DOUBLE, MASTER, msgtag+1, MPI_COMM_WORLD );
    return;
}

// only done by master
void recv_output_MPI( int nWrs, int M, double* U )
{
    unsigned int i, source;
    unsigned int msgtag = 1000;
    int ierr, offset;
    int chunkSize = (M + 2)/nWrs;
    MPI_Status status;

    for ( i = 1; i <= nWrs; i++ ) {
        source = i;
        msgtag = i * msgtag;
        offset = (i - 1)*chunkSize;
        ierr = MPI_Recv( &U[offset], chunkSize, MPI_DOUBLE, source, msgtag+1, MPI_COMM_WORLD, &status );

    }
    return;
}

如果你能在大师等级上正确计算offset,则无需发送。如果您无法计算它,那么您将不得不执行之前的两次发送/接收。此外,如果您可以将它们直接放入U 数组,则无需创建临时发送/接收缓冲区。这似乎是您对 memcopy 调用所做的事情。

最后,我不保证您正确计算索引offsetend 和大小chunkSize。我只是复制了你的价值观。您可以很明显地想象,您发送的金额必须等于您收到的金额。

希望这会有所帮助。

【讨论】:

  • “由于显而易见的原因,每次要发送信息时创建数据类型都很慢。” 这不一定(而且很可能根本不是)正确。创建和注册数据类型是本地操作,在现代 CPU 上非常快,或者至少比大多数网络互连的延迟更快。当然,这并不意味着在可以规避的情况下创建不必要的数据类型。
  • 虽然通信肯定是这些函数中最慢的部分,但没有理由在每次要发送任何内容时都不断地创建和释放类型。就像你永远不会建议任何人不断地在堆上分配和释放数组一样;相反,您应该保留一个数组并覆盖数据。
  • 感谢您的回复。为了回答您的问题,仅当我们要检查我们的近似值是否接近解析解时才调用此函数。 offsetchunkSize 与在其他地方使用的域分解中使用的相同。最后,这里使用 MPI_Gather 代替 send 和 recv 会更好吗,因为我们正在收集 Master 进程中的所有子数组?
  • 这取决于您的U 数组的存储方式。在我看来,每个人都为整个U 数组分配了内存,但他们只处理其中的一部分。这不是MPI_Gather 的常见用例,但如果您注意发送/接收缓冲区索引,它仍然可以工作。通常,MPI_Gather 在每个 MPI 进程仅为其本地问题分配少量内存的情况下执行。只有大师级别已经分配了全部空间。如果您不经常调用它/担心性能,我会将其保留为发送/接收。
  • 现在发送/接收工作正常。再次感谢您的帮助。
猜你喜欢
  • 1970-01-01
  • 2013-04-28
  • 2015-05-02
  • 2014-01-25
  • 1970-01-01
  • 2013-04-07
  • 2010-10-02
  • 1970-01-01
  • 2014-08-19
相关资源
最近更新 更多