【问题标题】:Dynamic load balancing master-worker动态负载均衡主从
【发布时间】:2021-07-26 10:14:42
【问题描述】:

我有一个索引数组,我希望每个工作人员根据这些索引做一些事情。 数组的大小可能会超过rank的总数,所以我的第一个问题是除了master-worker负载均衡还有其他方法吗?我想要一个平衡系统,并且我想将每个索引分配给每个等级。

我在考虑 master-worker,在这种方法中,master rank (0) 将每个索引分配给其他等级。但是当我以 3 等级和 15 索引运行我的代码时,我的代码在 while 循环中停止以发送索引 4。我想知道是否有人可以帮助我找到问题

if(pCurrentID == 0) { // Master
   MPI_Status status;

   int nindices = 15;
   int mesg[1] = {0};
   int initial_id = 0;
   int recv_mesg[1] = {0};

  // -- send out initial ids to workers --//
   while (initial_id < size - 1) {
     if (initial_id < nindices) {
       MPI_Send(mesg, 1, MPI_INT, initial_id + 1, 1, MPI_COMM_WORLD);
       mesg[0] += 1;
       ++initial_id;
     }
   }

   //-- hand out id to workers dynamically --//
   while (mesg[0] != nindices) {
     MPI_Probe(MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &status);
     int isource = status.MPI_SOURCE;
     MPI_Recv(recv_mesg, 1, MPI_INT, isource, 1, MPI_COMM_WORLD, &status);
     MPI_Send(mesg, 1, MPI_INT, isource, 1, MPI_COMM_WORLD);
     mesg[0] += 1;
   }

   //-- hand out ending signals once done --//
   for (int rank = 1; rank < size; ++rank) {
     mesg[0] = -1;
     MPI_Send(mesg, 1, MPI_INT, rank, 0, MPI_COMM_WORLD);
   }
 } else { 
   MPI_Status status;
   int id[1] = {0};
   // Get the surrounding fragment id
   MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
   int itag = status.MPI_TAG;
   MPI_Recv(id, 1, MPI_INT, 0, itag, MPI_COMM_WORLD, &status);
   
   int jfrag = id[0];
   if (jfrag < 0) break;
   // do something
   MPI_Send(id, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
 }

【问题讨论】:

    标签: c++ c performance parallel-processing mpi


    【解决方案1】:

    我有一个索引数组,我希望每个工作人员根据它做一些事情 在这些指标上。数组的大小可能大于总数 排名数,所以我的第一个问题是是否有另一种方法 除了这里的主从负载平衡?我想要一个余额 系统,我还想将每个索引分配给每个等级。

    不,但如果执行每个数组索引的工作所花费的时间大致相同,您可以简单地分散进程之间的数组。

    我在考虑master-worker,在这种方法中master rank (0) 将每个索引赋予其他等级。但是当我运行我的 具有 3 等级和 15 索引的代码我的代码在 while 循环中停止 发送索引 4. 我想知道是否有人可以帮我找到 问题

    正如 cmets 中已经指出的那样,问题是您(在工作人员方面)缺少查询主工作的循环。

    负载均衡器可以如下实现:

    1. 主初始向其他工作人员发送迭代;
    2. 每个worker都在等待master的消息;
    3. 之后master从MPI_ANY_SOURCE调用MPI_Recv并等待另一个worker请求工作;
    4. 在工作人员完成第一次迭代后,它会将其排名发送给主节点,通知主节点发送新的迭代;
    5. master 读取第 4 步中 worker 发送的 rank,检查数组是否有新索引,如果仍有有效索引,则将其发送给 worker。否则,发送一条特殊消息,通知工作人员没有更多工作要执行。例如,该消息可以是-1
    6. 当工作人员收到特殊消息时,它停止工作;
    7. 当所有工人都收到特殊消息后,主人停止工作。

    这种方法的一个例子:

    #include <stdio.h>
    #include <stdlib.h>
    #include <mpi.h>
    
    int main(int argc,char *argv[]){
        MPI_Init(NULL,NULL); // Initialize the MPI environment
        int rank; 
        int size;
        MPI_Status status;
        MPI_Comm_rank(MPI_COMM_WORLD,&rank);
        MPI_Comm_size(MPI_COMM_WORLD,&size);
    
        int work_is_done = -1;
        if(rank == 0){
           int max_index = 10; 
           int index_simulator = 0;
           // Send statically the first iterations
           for(int i = 1; i < size; i++){
               MPI_Send(&index_simulator, 1, MPI_INT, i, i, MPI_COMM_WORLD); 
               index_simulator++;
           }  
           int processes_finishing_work = 0;
         
           do{
              int process_that_wants_work = 0;
              MPI_Recv(&process_that_wants_work, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &status);
              if(index_simulator < max_index){
                 MPI_Send(&index_simulator, 1, MPI_INT, process_that_wants_work, 1, MPI_COMM_WORLD);  
                 index_simulator++;
              }
              else{ // send special message 
                   MPI_Send(&work_is_done, 1, MPI_INT, process_that_wants_work, 1, MPI_COMM_WORLD);
                   processes_finishing_work++;
              }
           } while(processes_finishing_work < size - 1);
        }
        else{
            int index_to_work = 0;
            MPI_Recv(&index_to_work, 1, MPI_INT, 0, rank, MPI_COMM_WORLD, &status);    
            // Work with the iterations index_to_work
        
           do{
              MPI_Send(&rank, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
              MPI_Recv(&index_to_work, 1, MPI_INT, 0, 1, MPI_COMM_WORLD, &status);
              if(index_to_work != work_is_done)
                 // Work with the iterations index_to_work
           }while(index_to_work != work_is_done);
        }
        printf("Process {%d} -> I AM OUT\n", rank);
        MPI_Finalize();
        return 0;
     }
    

    您可以通过减少以下方式改进上述方法:1) 发送的消息数量和 2) 等待它们的时间。对于前者,您可以尝试使用分块策略(发送多个索引每个 MPI通信)。对于后者,您可以尝试使用非阻塞 MPI 通信或让两个线程 per 处理一个来接收/发送工作,另一个来实际执行工作。这种多线程方法还允许主进程实际使用数组索引,但它使方法变得非常复杂。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-12-07
      • 1970-01-01
      • 2016-11-12
      • 2016-09-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多