【问题标题】:Parallelise code with pthread and semaphore使用 pthread 和 semaphore 并行化代码
【发布时间】:2016-10-15 13:03:31
【问题描述】:

我需要使用 Pthreads 和使用 C 的信号量使这段代码并行工作。它应该最多可以使用 16 个线程。

这是串行工作的代码:

    for(counter = numberOfPoints; counter > numberOfClusters; counter--){

        double minimunValue = 9999999999999;

        for(i = 0; i < counter; i++){
            for(j = 0; j < counter; j++){
                if((distanceMatrix[i][j] < minimunValue) && (i != j)){

                    minimunValue = distanceMatrix[i][j];
                    cityA = i;
                    cityB = j;
                }
            }
        }


        for(j = 0; j < counter; j++){
            if(j == cityA) 
                distanceMatrix[cityA][j] = 0;

            if(distanceMatrix[cityA][j] > distanceMatrix[cityB][j]){
                distanceMatrix[cityA][j] = distanceMatrix[cityB][j];
                distanceMatrix[j][cityA] = distanceMatrix[cityB][j];
            }
        }


        for(i = 0; i < counter; i++){
            for(j = cityB; j < counter - 1; j++){
                distanceMatrix[i][j] = distanceMatrix[i][j+1];
            }
        }

        for(i = cityB;i < (counter-1);i++){
            distanceMatrix[i] = distanceMatrix[i+1];  

        }
        //more code here but it doesn't matter for now
    }

我做了什么:

我创建了 2 个这样的信号量:

sem_t* mutex;
sem_t* lock;

    //inside main()
    int rc = sem_unlink("mutex");
    if (rc)
        perror("sem_unlink mutex");

    rc = sem_unlink("barrier");
    if (rc)
        perror("sem_unlink barrier");

    numberOfClusters = atoi(argv[2]);
    numberOfThreads = atoi(argv[3]);

    mutex = sem_open("mutex", O_CREAT, S_IRUSR | S_IWUSR, numberOfThreads);
    lock = sem_open("barrier", O_CREAT, S_IRUSR | S_IWUSR, numberOfThreads);

在我实现如下屏障后:

void barrier() {
    sem_wait(mutex);
    if (threadsWorking == (numberOfThreads - 1)) {
        threadsWorking = 0;
        sem_post(mutex);

        int i;
        for (i = 0; i < (numberOfThreads - 1); i++) {
            sem_post(lock);
        }
    } else {
        threadsWorking++;
        sem_post(mutex);
        sem_wait(lock);
    }
}  

现在,将在每个线程上调用的函数:

void *thread_clustering(void *thread_id){
    int i, j;
    int thread_counter = thread_id;

    //this is the 2nd for, this part is working fine using pthreads
    for(i = thread_counter; i < counter; i += numberOfThreads){
        if(distanceMatrix[cityA][i] > distanceMatrix[cityB][i]){
            distanceMatrix[cityA][i] = distanceMatrix[cityB][i];
            distanceMatrix[i][cityA] = distanceMatrix[cityB][i];
        }
    }

    barrier();

    //this commented code is about the 3rd for, but it is not working as I want
    // for(i = thread_counter; i < counter; i += numberOfThreads){
    //     for(j = cityB; j < counter - 1; j++){
    //         distanceMatrix[i][j] = distanceMatrix[i][j+1];
    //     }
    // }

    // barrier();
}

这就是我认为我的最终代码在所有并行化之后的样子:

for(counter = numberOfPoints; counter > numberOfClusters; counter--){
    for(i = 0; i < numberOfThreads; i++){
        pthread_create(&threads[i],NULL, thread_clustering, (void *)i);
    }  

    for(i = 0; i < numberOfThreads; i++){
        pthread_join(threads[i], NULL);
    } 
    //more code here
}

那么,任何人都知道我应该怎么做才能将所有第一个代码放入 thread_clustering 中?

问候!

编辑:

这是分层聚类算法的一部分,如上所述:

  1. 从包含一个对象的 n 个集群开始
  2. 从邻近度中找到最相似的簇 Ci 和 Cj 对 矩阵并将它们合并到一个集群中
  3. 更新邻近矩阵(将其阶数减一,通过将单个集群替换为合并后的集群)
  4. 重复步骤 (2) e (3) 直到获得单个簇(即 N-1 次)

【问题讨论】:

  • double minimunValue = 9999999999999; 应有尽有:隐式转换、幻数、可能误导...
  • 代码有哪些具体问题?
  • @EOF 这只是一个数字,我正在寻找矩阵中的最小值
  • @4386427 我被要求并行实现层次聚类算法,这部分代码是我自己无法并行化的关键部分
  • @Claudio - 您能否添加一些您正在尝试解决的问题的描述(串行)以及您计划如何在线程之间分配工作。

标签: c pthreads semaphore


【解决方案1】:

对于第一组嵌套的for() 循环,它找到最近的一对城市,很容易以与其他循环相同的方式并行化它,以便每个线程在其部分中找到最近的一对城市搜索结果:

double minimumValue = DBL_MAX;

for (i = thread_counter; i < counter; i += numberOfThreads) {
    for (j = 0; j < counter; j++) {
        if ((distanceMatrix[i][j] < minimumValue) && (i != j)) {
            minimumValue = distanceMatrix[i][j];
            cityA = i;
            cityB = j;
        }
    }
}

但对于函数的其余部分,您需要在所有线程中使用最近的一组城市。因此,您需要一个“最近的城市”数组,每个线程一个,并在第一个循环之后让每个线程存储到该数组中:

closest[thread_counter].distance = minimumValue;
closest[thread_counter].cityA = cityA;
closest[thread_counter].cityB = cityB;

现在你需要等待其他线程存储它们的值,然后选择最小的一个:

barrier();

minimumValue = DBL_MAX;

for (i = 0; i < numberOfThreads; i++) {
    if (minimumValue > closest[i].distance) {
        minimumValue = closest[i].distance)
        cityA = closest[i].cityA;
        cityB = closest[i].cityB;
    }
}

(请注意,我们需要再次将minimumValue重新设置为最大值,以便所有线程在存在多个具有相同距离的选项时总是以相同的顺序选择相同的城市对。

请注意,您的 barrier() 实现存在问题 - 您应该将 mutex 信号量初始化为值 1(这样只有等待它的第一个线程才能继续执行直到它发布),并且 @ 987654329@ 信号量应该初始化为值0(这样所有等待它的线程都会被阻塞,直到它被发布)。信号量的初始值是sem_open()的最后一个参数。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-02
    相关资源
    最近更新 更多