【问题标题】:Avoid losing data for jobs running on cluster避免丢失在集群上运行的作业的数据
【发布时间】:2014-04-10 03:04:47
【问题描述】:

最近,我开始在集群上工作以加快工作速度。目前我的工作是使用不同大小的输入数据来分析代码。以前我在每个输入文件的for 循环中这样做。在集群上,我使用 MPI 将每个不同的输入作为不同的进程运行程序。

MPI 脚本的源代码可以在下面找到。它运行不同的进程,还有一个“服务器”进程写出结果,以避免两个进程同时写出输出文件的情况。由于代码现在是结构化的,如果所有进程都按时结束,我只能访问输出。这是一个问题,因为我试图将我的工作的 walltime 保持在尽可能低的时间以使工作快速启动(集群总是满的,所以如果我需要太多资源,就很难开始工作)。所以有时我的工作会过早地被调度程序中断。

我的想法是在服务器进程中添加一个计时器,如果当前的 walltime 接近最大值(在下面的代码中为两分钟),则关闭文件流。这样,至少我不会丢失已经收集到的数据。然而,这将不起作用,因为计时器仅在服务器接收到新数据时才会更新。 应避免仅在服务器接收到新数据时打开文件,因为我更喜欢每次提交作业时都从空输出文件开始。还有哪些其他选项可以确保我不会丢失已收集的输出?

#include <mpi.h>
#define RES 1

int main(int argc, char *argv[]){

  int nprocs, myid, server, ndone;
  double WallTime;
  struct timeval start, end;
  double countTime, res[4];
  FILE *fpt;

  WallTime = 1*60+59;

  MPI_Comm world;
  MPI_Group world_group;
  MPI_Status status;
  MPI_Init(&argc, &argv);
  world = MPI_COMM_WORLD;
  MPI_Comm_size(world,&nprocs);
  MPI_Comm_rank(world,&myid);
  server = nprocs-1; /* last proc is server */
  MPI_Comm_group(world, &world_group);

  if(myid == server){ /* I store the output */
    ndone = 0;
    fpt = fopen(argv[2],"wt");
    gettimeofday(&start, NULL);
    do{
      MPI_Recv(res, 4, MPI_DOUBLE, MPI_ANY_SOURCE, RES, world, &status);
      fprintf(fpt,"%d\t%10.7f\t%10.7f\t%ld\n", (int) res[0], res[1], res[2], (long int) res[3]);
      gettimeofday(&end, NULL);
      countTime = (end.tv_sec+(end.tv_usec)*1.e-6)-(start.tv_sec+(start.tv_usec)*1.e-6);
      ndone++;
    } while (ndone < (nprocs-1) && countTime < WallTime);
    fclose(fpt);
  } else if(myid<(nprocs-1)){
    do sth with data according to myid ...
    MPI_Send(res, 4, MPI_DOUBLE, server, RES, world);
  }
  MPI_Finalize();
}

【问题讨论】:

    标签: c mpi hpc


    【解决方案1】:

    选项 1: 使用非阻塞探测检查消息是否正在等待,如果没有则休眠:

    do {
      int flag;
      MPI_Iprobe(MPI_ANY_SOURCE, RES, &flag, world, &status);
      if (flag) {
        MPI_Recv(res, 4, MPI_DOUBLE, status.MPI_SOURCE, RES, world, &status);
        ...
        ndone++;
      }
      else
        usleep(10000);
      gettimeofday(&end, NULL);
      countTime = (end.tv_sec+(end.tv_usec)*1.e-6)-(start.tv_sec+(start.tv_usec)*1.e-6);
    } while (ndone < (nprocs - 1) && countTime < WallTime);
    

    您可以跳过usleep() 调用,然后主进程将运行一个紧密循环,将 CPU 利用率保持在几乎 100%。在每个 MPI 等级都绑定到单独的 CPU 内核的 HPC 系统上,这通常不是问题。

    选项 2: 大多数资源管理器可以配置为在作业即将被终止之前的某个时间传递 Unix 信号。例如,Sun/Oracle Grid Engine 和 LSF 都会在作业被 SIGKILL 终止之前的某个时间提供 SIGUSR2。对于 SGE,应将 -notify 选项添加到 qsub 以使其发送 SIGUSR2。 SGE 管理员可以根据每个队列配置 SIGUSR2 和以下 SIGKILL 之间的时间量。 LSF 在到达作业结束时间时发送 SIGUSR2,如果在此之后的 10 分钟内作业没有终止,则发送 SIGKILL。

    选项 3: 如果您的资源经理不合作并且在终止您的工作之前没有发送警告信号,您可以简单地给自己发送 SIGALRM。您通常会执行以下操作:

    • 使用timer_create() 创建一个计时器;
    • (重新)使用timer_settime() 设置定时器;
    • 最后使用timer_delete()销毁计时器。

    您可以将计时器编程为在总挂钟时间之前不久到期(但这是一种不好的编程习惯,因为您必须将该值与资源管理器请求的挂钟时间相匹配)或者您可以计时器以较短的间隔触发,例如5分钟,然后每次都重新装弹。

    选项 2 和 3 要求您为相应的信号编写和设置信号处理程序。信号的好处是它们通常是异步传递的,即使你的代码被困在像MPI_Recv 这样的阻塞 MPI 调用中。我认为这是一个高级主题,并建议您暂时坚持选项 1,并记住选项 2 和 3 存在。

    选项 4: 一些 MPI 库支持正在运行的作业的检查点/重新启动。检查点创建 MPI 作业运行状态的快照,然后可以使用特殊的 mpiexec(或任何 MPI 启动器的名称,如果有)命令行标志恢复状态。这种方法需要对程序的源代码进行零更改,但通常不会广泛使用,尤其是在集群设置上。

    【讨论】:

      【解决方案2】:

      当异步终止信号从作业调度程序到达时,您似乎正在通过 fprintf() 使用缓冲文件 I/O。异步信号将中止作业,并且 glibc 将没有机会刷新其文件缓冲区。您可能很想从信号处理程序中使用 fflush(),但 fflush() 不是异步信号处理程序安全的。

      这里有一些建议可以避免过于复杂:

      无缓冲 I/O:

      简单的解决方案是将文件描述符切换为非阻塞。您可以通过以下方式执行此操作:

      setbuf(filehandle, NULL);
      

      由于这是无缓冲的,glibc 不会执行写组合。如果 fprintf() 不常见,则不会有问题。但如果您要编写许多简短的 fprintf() 调用,这可能不是提高性能的好选择。

      定期刷新文件内容

      glibc fflush() 命令可以推出缓冲区中的数据。仅当您在每个 fprintf() 之后进行 fflush 时,这才模拟无缓冲 I/O 情况。然而, fflush() 提供了更多的灵活性。由于您似乎无法依赖最大 MPI_Recv() 时间,因此您可能会考虑定期刷新文件缓冲区。

      一种方法是使用 pthread_create() 生成一个单独的线程,并让新线程定期调用 fflush(filehandle)。每秒一次应该是一个很好的频率。您需要小心确保文件句柄在两个线程之间保持有效。

      【讨论】:

      • 感谢@HristoIliev 和 Tom Gooding 的回复。使用您的建议,我想出了以下内容:服务器进程每五秒刷新一次文件指针,使用第一个回复的选项 1 和第二个 while 循环,即:do{gettimeofday(...); countTime=0.; while(countTime&lt;5){MPI_Iprobe(...); (... as suggested in option 1) gettimeofday(...); countTime=...;}fflush(fpt); } while(ndone&lt;server); 这样,将异步信号处理程序与避免使用 fflush() 并且不应该将 walltime 手动添加到源代码中。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-04-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多