【问题标题】:Using MPI to take in data temporarily use it and then return a result使用 MPI 取数据临时使用它然后返回结果
【发布时间】:2015-12-09 16:12:45
【问题描述】:

我有一个起始位置数组来读取程序中每个婴儿节点的文件。我正在尝试让头节点发送位置以开始将文件读取到每个节点,然后让它们发回结果。
造成困难的是文件中没有完美数量的节点到行,因此必须一遍又一遍地使用它们。为此,我尝试使用for 循环进行发送和接收,其中头节点发送文件中行数的消息,而婴儿节点接收文件中行数除以婴儿节点数的消息。

简而言之,它不适合我,我真的不知道该怎么办。

if(qNum == 1){  //If query Number is one
    if(firstSource == 1){ //And the source is equal to 1
        if(my_rank == 0){ // if this process is the head node
            int startVal = 0; // declare variable for starting value
            int z = 1; // declare variable to loop through baby nodes
            for(int i = 1; i <= enronInfo[0]; i++){  // for # of lines in file
                if(z == world_size){ // if process num equals largest process num reset to 1
                    z = 1;
                }
                startVal = getFseekVal(i, firstSource); //set the startVal to the value at location I in the array.
                MPI_Send(&startVal, 1, MPI_INT, z, i, MPI_COMM_WORLD); //send a message to processor z with the startVal
            }
            MPI_Barrier(MPI_COMM_WORLD); //Don't know if this helps
            if(my_rank != 0){ //if not the headnode
                int startVal; // declare variable for starting value
                for(int i = 0; i<=babyLoopSize; i++){ // for # of lines in processor divided by # of babynodes
                    MPI_Recv(&startVal, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // receive a message with startVal from the headnode
                }
            }
            MPI_Barrier(MPI_COMM_WORLD); // Don't know if this helps
        }
    }







#include <fstream>
#include <string>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

using namespace std;
int enronDSarr[39859], nipsDSarr[1499], kosDSarr[3429], nytDSarr[299999]; //containers for docstring values
string enronV = "/home/mcconnel/BagOfWords/vocab.enron.txt";
string nipsV = "/home/mcconnel/BagOfWords/vocab.nips.txt";
string kosV = "/home/mcconnel/BagOfWords/vocab.kos.txt";
string nytV = "/home/mcconnel/BagOfWords/vocab.nytimes.txt";
string enronDW = "/home/mcconnel/BagOfWords/docword.enron.txt";
string nipsDW = "/home/mcconnel/BagOfWords/docword.nips.txt";
string kosDW = "/home/mcconnel/BagOfWords/docword.kos.txt";             // Strings for locations of each file
string nytDW = "/home/mcconnel/BagOfWords/docword.nytimes.txt";
string enronDS = "/home/mcconnel/BagOfWords/docstart.enron.txt";
string nipsDS = "/home/mcconnel/BagOfWords/docstart.nips.txt";
string kosDS = "/home/mcconnel/BagOfWords/docstart.kos.txt";
string nytDS = "/home/mcconnel/BagOfWords/docstart.nytimes.txt";
int enronInfo[3], nipsInfo[3], kosInfo[3], nytInfo[3];                  // Arrays storing first 3 lines of info from each DocWords file
int firstSource, secondSource, numTimes, qNum, wordLength;
string enteredWord;
char enteredWordChar[50];



int word2int(string fileLocation, string input){                // converts text from a file into a word count value
        ifstream file;
        file.open(fileLocation.c_str());
        string word;
        int i = 1;
        int wordNum;
        while(file.good()){
                file >> word;
                if(word.compare(input)== 0){
                        wordNum = i;
                        return wordNum;
                }
                i++;
        }
        return wordNum;
}

int getFseekVal(int docNumber, int sourceNumber){
        if(sourceNumber  == 1){
                return enronDSarr[docNumber - 1];
        }
        else if(sourceNumber == 2){
                return nipsDSarr[docNumber - 1];
        }
        else if(sourceNumber == 3){
                return kosDSarr[docNumber - 1];
        }
        else{
                return nytDSarr[docNumber - 1];
        }
}


string int2word(string fileLocation, int wordInt){              // converts a word count value from a file into the actual text
        ifstream file;
        file.open(fileLocation.c_str());
        string word;
        int i = 1;
        string retWord;
        while(file.good()){
                file >> word;
                if(i == wordInt){
                        retWord = word;
                        return retWord;
                }
                i++;
        }
        return 0;
}


int getInfoDW(string fileLocation, int pos){                    //imports an array of length 3 for each document's info in the docwords file
        ifstream file;
        file.open(fileLocation.c_str());
        int word;
        int i = 0;
        int retWord;
        while(file.good()){
                file >> word;
                if(i == 0 && pos == 0){
                        return word;
                }
                if(i == 1 && pos == 1){
                        return word;
                }
                if(i == 2 && pos == 2){
                        return word;
                }
                i++;
        }
        return retWord;
}

int getEnronDS(string fileLocation){                            // imports array from Enron docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(), "r");
    for(i = 0; i<39861; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                enronDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}


int getNipsDS(string fileLocation){                             // imports array from NIPS docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<1500; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                nipsDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}

int getKosDS(string fileLocation){                              // imports array from KOS docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<3430; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                kosDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}

int getNytDS(string fileLocation){                              // imports array from NYT docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<300000; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                nytDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}

int getCurrentDS(int fileNumber, int documentNum){              //Will be used to return docstart byte value at document location
        if(fileNumber == 0){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 1){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 2){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 3){
                return enronDSarr[documentNum - 1];
        }
        else{
                printf("Something definitely went wrong");
        }
}

int getSourceNumber(){
        int source;
        printf("Select a wordbag:\n 1. Enron \n 2. NIPS \n 3. KOS\n 4. NYT\n");
        cin >> source;
        return source;
}
int getUserResponse(){
   int i = 1;
   while(i){
        printf("Choose a query(1-4) and press enter:\n");
        printf("1. What percent of documents in X use any one word more than ____ times?\n");
        printf("2. What words in X are used more than ____ times in any document?\n");
        printf("3. In which data set does the word ____ appear most frequently?\n");
        printf("4. Does ____ appear more frequently in X or Y?\n");
        cin >> qNum;
        if(qNum < 5 && qNum > 0){
                i = 0;
                printf("%d", qNum);
        }
        else{
                printf("Invalid Response, Please Try Again \n");
        }
        if(qNum == 1){
                firstSource = getSourceNumber();
                printf("and how many times?\n");
                cin >> numTimes;
               // query1(firstSource, numTimes);

        }
        else if(qNum == 2){
                firstSource = getSourceNumber();
                printf("and how many times?\n");
                cin >> numTimes;

        }
        else if(qNum == 3){
                printf("What word would you like to use?\n");
                cin >> enteredWord;
        }
        else if(qNum == 4){
                printf("What word would you like to use?\n");
                cin >> enteredWord;
                printf("Select your first source...\n");
                firstSource = getSourceNumber();
                printf("Select your second source...\n");
                secondSource = getSourceNumber();
        }

    }
}

void importFiles(){
        getEnronDS(enronDS);
        getNipsDS(nipsDS);
        getKosDS(kosDS);                                                        // Functions to read in arrays for each docstart fil
        getNytDS(enronDS);
        for(int a = 0; a <= 2; a++){
                 enronInfo[a] = getInfoDW(enronDW, a);
                 nipsInfo[a] = getInfoDW(nipsDW, a);
                 kosInfo[a] = getInfoDW(kosDW, a);
                 nytInfo[a] = getInfoDW(nytDW, a);
        }
}




int main(int argc, char** argv){
        int world_size, my_rank, numDocs,fseekVal, babyLoopSize, babyLoopSize2, babyLoopSize3, babyLoopSize4;
        MPI_Init(NULL,NULL);
        MPI_Comm_size(MPI_COMM_WORLD, &world_size);
        MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
        cout << " my rank is " << my_rank << "\n";
        importFiles();
        cout << " my rank is " << my_rank << " I know that enron's doc size is " << enronInfo[0] << "\n";
        if(my_rank == 0){
                getUserResponse();
        }
        MPI_Barrier(MPI_COMM_WORLD);
        babyLoopSize = enronInfo[0] / (world_size - 1);
        babyLoopSize2 = nipsInfo[0] / (world_size - 1);
        babyLoopSize3 = kosInfo[0] / (world_size - 1);
        babyLoopSize4 = nytInfo[0] / (world_size - 1);
        //cout << " my rank is " << my_rank << " I know that enron's doc size is " << enronInfo[0] << " and that my babyLoopSize for enron is " << babyLoopSize << "\n";
        //cout << " my rank is " << my_rank << " I know that qNum is  " << qNum << "\n";
        MPI_Bcast(&qNum, 1, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&firstSource, 2, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&secondSource,3, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&enteredWordChar,4, MPI_CHAR, 0, MPI_COMM_WORLD);


        if(qNum == 1){
                if(firstSource == 1){
                        if(my_rank == 0){
                                int startVal = 0;
                                int z = 1;
                                for(int i = 1; i <= enronInfo[0]; i++){
                                        if(z == world_size){
                                                z = 1;
                                        }
                                        startVal = getFseekVal(i, firstSource);
                                        MPI_Send(&startVal, 1, MPI_INT, z, i, MPI_COMM_WORLD);
                                }
                        if(my_rank != 0){
                                int startVal;
                                for(int i = 0; i<=babyLoopSize; i++){
                                        MPI_Recv(&startVal, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
                                }
                        }
                        MPI_Barrier(MPI_COMM_WORLD);
                }
        }




        MPI_Finalize();

}

【问题讨论】:

  • 嗯,重新缩进代码显示的第一件事是if(my_rank == 0){ 块包含if(my_rank != 0){ 一个!我猜在第一个MPI_Barrier(MPI_COMM_WORLD); 之前缺少一个额外的}。先解决这个问题,然后我们看看你是否还有问题。
  • 可能复制和粘贴出错了,我将把整个程序都安装起来以防万一
  • 我确认在您的完整代码中,if(my_rank != 0){ 块位于 if(my_rank == 0){ 块内。这显然是一个你不能忽视的错误。先解决这个问题,看看有什么效果。

标签: c++ mpi


【解决方案1】:

MPI_Send 通常是一个阻塞命令,参见here

此例程可能会阻塞,直到消息被 目标进程。

这意味着每次在主进程上运行MPI_Send,相应的从进程必须运行MPI_Recv,主进程才能继续。通过包含第一行MPI_Barrier,您告诉每个从站等待主站完成发送所有数据,然后再让从站接收数据,但是由于MPI_Send 的阻塞性质,主站永远不会从第一个返回致电MPI_Send

对于您的问题,我建议先将其分解成更小的部分。编写代码来回答每个子问题:

// Determine how many workers we have
int nWorkers = ...;

// Determine where in the file each worker should start
// If we store each location in an array we can make use of
// another MPI command latter
int aStartingLocs[nWorkers] = {...};

// Distribute starting locations to each worker
int nMyStart;
MPI_Scatter(aStartingLocs, 1, MPI_INT,   //< Things to send
            &nMyStart, 1, MPI_INT,       //< Recieved value
            ...);

// Our starting location is stored in nMyStart
// TODO: Use starting location to compute results
double dResult = ...;

// We can use MPI_Gather to send the values back to master
double aResults[nWorkers];
MPI_Gather(&dResult, 1, MPI_DOUBLE,    //< What we are sending
           aResults, 1, MPI_DOUBLE,    //< Where to store the result
           ...);

// Now we can use the result how we choose
if (bIAmMaster)
{
   // TODO: Use the results
}

以上只是一个大纲(半伪代码/注释),但希望您能够将其用作指南。

我有一段时间没有使用 MPI,所以你应该明确检查 MPI_ScatterMPI_Gather 的语法。 Check out the first link that came up on Google.

如果您需要分散一些不能被工人数量整除的项目,您可以改用MPI_ScattervMPI_Gatherv,请参阅man page

如果您愿意,可以尝试实现自己的MPI_Scatter(v) 版本,我记得有一个 HPC 任务就是这样做的。但是,大多数情况下,只使用库函数可能更容易/更好。

【讨论】:

  • 我不想使用 scatter 的唯一原因是我的婴儿节点比程序中的行少。 scatter 会让这种情况容易处理吗?
  • 我不确定是否有内置方法,但是您可以为这些节点传递一个无效的数字(例如 -1),并且任何接收到无效值的节点都可以用 a虚拟值。然后,master 可以只考虑有效值或它没有发送 -1 的进程。
  • @BrandonJerz 原来有库函数可以完全按照您的意愿行事,MPI_Scatterv,请参阅我的上次编辑。
猜你喜欢
  • 2021-05-15
  • 2016-07-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-04
  • 2022-10-13
  • 2015-07-08
相关资源
最近更新 更多