【问题标题】:What is the right way to "notify" processors without blocking?在不阻塞的情况下“通知”处理器的正确方法是什么?
【发布时间】:2016-06-27 22:00:49
【问题描述】:

假设我有很多东西,我必须对所有这些东西做一些操作。 如果一个元素的操作失败,我想停止整个数组的工作[这项工作分布在多个处理器上]。

我希望在将发送/接收消息的数量保持在最低限度的同时实现这一目标。 另外,如果不需要,我不想阻塞处理器。

我如何使用 MPI 来做到这一点?

【问题讨论】:

  • 你的阵列是分布式的吗?
  • 假设每个进程都有它的本地数组,

标签: mpi


【解决方案1】:

以非阻塞方式导出此全局停止条件的一种可能策略是依赖MPI_Test

场景

考虑每个进程发布一个 MPI_INT 类型的异步接收到它的左列,并带有一个给定的标签来构建一个环。然后开始你的计算。如果一个等级遇到停止条件,它会将自己的等级发送到正确的等级。同时,每个等级在计算过程中使用MPI_Test检查MPI_Irecv是否完成,如果它完成则进入一个分支首先等待消息,然后将接收到的等级传递到右侧,除非正确的等级是等于消息的负载(不循环)。

完成后,您应该使分支中的所有进程都准备好触发任意恢复操作。

复杂性

保留的拓扑是一个环,因为它最多 (n-1) 减少了消息的数量,但它增加了传播时间。其他拓扑可以保留更多消息,但空间复杂度较低,例如具有 n.ln(n) 复杂度的树。

实施

类似的东西。

int rank, size;
MPI_Init(&argc,&argv);
MPI_Comm_rank( MPI_COMM_WORLD, &rank);
MPI_Comm_size( MPI_COMM_WORLD, &size);

int left_rank = (rank==0)?(size-1):(rank-1);
int right_rank = (rank==(size-1))?0:(rank+1)%size;

int stop_cond_rank;
MPI_Request stop_cond_request;
int stop_cond= 0;

while( 1 )
{
         MPI_Irecv( &stop_cond_rank, 1, MPI_INT, left_rank, 123, MPI_COMM_WORLD, &stop_cond_request);

         /* Compute Here and set stop condition accordingly */

         if( stop_cond )
         {
                 /* Cancel the left recv */
                 MPI_Cancel( &stop_cond_request );
                 if( rank != right_rank )
                            MPI_Send( &rank, 1, MPI_INT, right_rank, 123, MPI_COMM_WORLD ); 

                   break;
         }

         int did_recv = 0;
         MPI_Test( &stop_cond_request, &did_recv, MPI_STATUS_IGNORE );
         if( did_recv )
         {
                  stop_cond = 1;
                  MPI_Wait( &stop_cond_request, MPI_STATUS_IGNORE );
                  if( right_rank != stop_cond_rank )
                            MPI_Send( &stop_cond_rank, 1, MPI_INT, right_rank, 123, MPI_COMM_WORLD );

                   break;
          }
}

if( stop_cond )
{
      /* Handle the stop condition */
}
else
{
      /* Cleanup */
     MPI_Cancel( &stop_cond_request );
}

【讨论】:

    【解决方案2】:

    这是我已经问过自己几次但没有找到任何完全令人满意的答案的问题......我唯一想到的(除了MPI_Abort() 这样做但有点极端)是创建一个@ 987654323@ 存储一个标志,该标志将由面临问题的任何进程引发,并由所有进程定期检查以查看它们是否可以继续处理。这是使用非阻塞调用完成的,与this answer 中描述的方式相同。

    主要缺点是:

    1. 这取决于自愿检查标志状态的进程。通知他们不会立即中断他们的工作。
    2. 此检查的频率必须手动调整。您必须在浪费处理数据的时间(因为在其他地方发生了一些事情而没有必要)与检查状态所花费的时间之间找到平衡...

    最后,我们需要一种定义由诸如MPI_Abort() 之类的 MPI 调用触发的回调操作的方法(基本上用其他东西替换中止操作)。我不认为这存在,但也许我忽略了它。

    【讨论】:

      【解决方案3】:

      这似乎是一个没有简单答案的常见问题。其他两个答案都存在可扩展性问题。环形通信方法具有线性通信成本,而在单方面的MPI_Win 解决方案中,单个进程将受到来自所有工作人员的内存请求的冲击。这对于少量排名可能没问题,但在增加排名数时会出现问题。

      非阻塞集合体可以提供更具可扩展性的更好解决方案。主要思想是在除一个指定的根目录之外的所有等级上发布MPI_Ibarrier。此根将通过MPI_Irecv 侦听点对点停止消息,并在收到后完成MPI_Ibarrier

      棘手的部分是需要处理四种不同的情况“{root, non-root} x {found, not-found}”。也可能发生多个等级发送停止消息,需要在根上接收未知数量的匹配。这可以通过额外减少计算发送停止请求的排名数来解决。

      这是一个在 C 语言中的示例:

      #include <stdio.h>
      #include <stdlib.h>
      #include <mpi.h>
      
      const int iter_max = 10000;
      const int difficulty = 20000;
      
      int find_stuff()
      {
          int num_iters = rand() % iter_max;
          for (int i = 0; i < num_iters; i++) {
              if (rand() % difficulty == 0) {
                  return 1;
              }
          }
          return 0;
      }
      
      const int stop_tag = 42;
      const int root = 0;
      
      int forward_stop(MPI_Request* root_recv_stop, MPI_Request* all_recv_stop, int found_count)
      {
          int flag;
          MPI_Status status;
          if (found_count == 0) {
              MPI_Test(root_recv_stop, &flag, &status);
          } else {
              // If we find something on the root, we actually wait until we receive our own message.
              MPI_Wait(root_recv_stop, &status);
              flag = 1;
          }
          if (flag) {
              printf("Forwarding stop signal from %d\n", status.MPI_SOURCE);
              MPI_Ibarrier(MPI_COMM_WORLD, all_recv_stop);
              MPI_Wait(all_recv_stop, MPI_STATUS_IGNORE);
              // We must post some additional receives if multiple ranks found something at the same time
              MPI_Reduce(MPI_IN_PLACE, &found_count, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
              for (found_count--; found_count > 0; found_count--) {
                  MPI_Recv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &status);
                  printf("Additional stop from: %d\n", status.MPI_SOURCE);
              }
              return 1;
          }
          return 0;
      }
      
      int main()
      {
          MPI_Init(NULL, NULL);
      
          int rank;
          MPI_Comm_rank(MPI_COMM_WORLD, &rank);
          srand(rank);
      
          MPI_Request root_recv_stop;
          MPI_Request all_recv_stop;
          if (rank == root) {
              MPI_Irecv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &root_recv_stop);
          } else {
              // You may want to use an extra communicator here, to avoid messing with other barriers
              MPI_Ibarrier(MPI_COMM_WORLD, &all_recv_stop);
          }
      
          while (1) {
              int found = find_stuff();
              if (found) {
                  printf("Rank %d found something.\n", rank);
                  // Note: We cannot post this as blocking, otherwise there is a deadlock with the reduce
                  MPI_Request req;
                  MPI_Isend(NULL, 0, MPI_CHAR, root, stop_tag, MPI_COMM_WORLD, &req);
                  if (rank != root) {
                      // We know that we are going to receive our own stop signal.
                      // This avoids running another useless iteration
                      MPI_Wait(&all_recv_stop, MPI_STATUS_IGNORE);
                      MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
                      MPI_Wait(&req, MPI_STATUS_IGNORE);
                      break;
                  }
                  MPI_Wait(&req, MPI_STATUS_IGNORE);
              }
              if (rank == root) {
                  if (forward_stop(&root_recv_stop, &all_recv_stop, found)) {
                      break;
                  }
              } else {
                  int stop_signal;
                  MPI_Test(&all_recv_stop, &stop_signal, MPI_STATUS_IGNORE);
                  if (stop_signal)
                  {
                      MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
                      printf("Rank %d stopping after receiving signal.\n", rank);
                      break;
                  }
              }
          };
      
          MPI_Finalize();
      }
      

      虽然这不是最简单的代码,但它应该:

      • 不引入额外的阻塞
      • 通过实施屏障进行扩展(通常为O(log N)
      • 找到一个全部停止的最坏情况延迟为 2 * 循环时间(+ 1 p2p + 1 障碍 + 1 减少)。李>
      • 如果许多/所有等级同时找到解决方案,它仍然有效,但效率可能较低。

      【讨论】:

        猜你喜欢
        • 2017-04-05
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-05-21
        • 1970-01-01
        • 2018-12-13
        • 1970-01-01
        相关资源
        最近更新 更多