【问题标题】:pthread execution time worse than sequentialpthread 执行时间比顺序执行时间差
【发布时间】:2019-12-19 02:16:05
【问题描述】:

我正在学习使用 pthread,希望它能帮助我的一些最慢的代码片段 快一点。我尝试(作为热身示例)使用 线程。我写了一个比较三种方法的代码:

  • 使用 NEVALS 被积函数评估积分的单线程 pthread 评估。
  • 使用 NEVALS 对积分 NTHREADS 时间进行多线程评估 积分评估。
  • 多个线程提交到我的 CPU 中的不同内核,再次总计 NEVALS*NTHREADS 积分评估。

在运行时,每个被积函数评估速度最快的是单核,比其他核快 2 到 3 倍。其他两个似乎有点等价,除了以下事实 CPU使用率非常不同,第二个将线程分布在所有(8)个内核上 在我的 CPU 中,而第三个(不出所料)将工作集中在 NTHREADS 中并留下其余部分 无人居住。

这里是来源:

#include <iostream>
#define __USE_GNU
#include <sched.h>
#include <pthread.h>
#include <thread>
#include <stdlib.h>
#include <math.h>
#include <time.h>
#include <unistd.h>

using namespace std;

double aleatorio(double a, double b){
    double r = double(rand())/RAND_MAX;
    return a + r * (b - a);

}

double funct(double* a){
    return pow(a[0],6);
}

void EstimateBounds(int ndim, double (*f)(double*), double* bounds){
    double x[ndim];
    for(int i=1;i<=1000;i++){
        for(int j=0;j<ndim;j++) x[j] = aleatorio(0,1);
        if ( f(x) > bounds[1]) bounds[1] = f(x);
        if ( f(x) < bounds[0]) bounds[0] = f(x);
    }
}

void Integrate(double (*f)(double*), int ndim, double* integral, int verbose, int seed){

    int nbatch = 5000000;
    const int maxeval = 25*nbatch;
    double x[ndim];
    srand(seed);




    /// Algorithm to estimate the maxima and minima ///
    for(int j=0;j<ndim;j++) x[j] = 0.5;
    double bounds[2] = {f(x),f(x)};
    EstimateBounds(ndim,f,bounds);

    /// Integral initialization ///
    int niter = int(maxeval/nbatch);


    for(int k=1;k<=niter;k++)
    {


        double loc_min = bounds[0];
        double loc_max = bounds[1];

        int count = 0;
        for (int i=1; i<=nbatch; i++)
        {
            for(int j=0;j<ndim;j++) x[j] = aleatorio(0,1);
            double y = aleatorio(bounds[0],bounds[1]);
            if ( f(x) > loc_max )   loc_max = f(x);
            if ( f(x) < loc_min )   loc_min = f(x);
            if ( f(x) > y && y > 0 ) count++;
            if ( f(x) < y && y < 0 ) count--;

        }

        double delta = (bounds[1]-bounds[0])*double(count)/nbatch;
        integral[0]  += delta;
        integral[1] += pow(delta,2);
        bounds[0] = loc_min;
        bounds[1] = loc_max;

        if(verbose>0){
        cout << "Iteration["<<k<<"]: " << k*nbatch;
        cout << " integrand evaluations so far" <<endl;
        if(verbose>1){
            cout << "The bounds for this iteration were = ["<<bounds[0]<<","<<bounds[1]<<"]"<<endl;}
        cout << "Integral = ";
        cout << integral[0]/k << " +- "; 
        cout << sqrt((integral[1]/k - pow(integral[0]/k,2)))/(k) << endl;
        cout << endl;
        }

    }
    integral[0] /= niter;
    integral[1] = sqrt((integral[1]/niter - pow(integral[0],2)))/niter;

}

struct IntegratorArguments{

    double (*Integrand)(double*);
    int NumberOfVariables;
    double* Integral;
    int VerboseLevel;
    int Seed;

};

void LayeredIntegrate(IntegratorArguments IA){
    Integrate(IA.Integrand,IA.NumberOfVariables,IA.Integral,IA.VerboseLevel,IA.Seed);
}

void ThreadIntegrate(void * IntArgs){
    IntegratorArguments *IA = (IntegratorArguments*)IntArgs;
    LayeredIntegrate(*IA);
    pthread_exit(NULL);

}

#define NTHREADS 5

int main(void)
{

   cout.precision(16);
   bool execute_single_core = true;
   bool execute_multi_core = true;
   bool execute_multi_core_2 = true;

   ///////////////////////////////////////////////////////////////////////////
   ///
   ///          Single Thread Execution 
   ///
   ///////////////////////////////////////////////////////////////////////////

    if(execute_single_core){
    pthread_t thr0;
    double integral_value0[2] = {0,0};
    IntegratorArguments IntArg0;
    IntArg0.Integrand = funct;
    IntArg0.NumberOfVariables = 2;
    IntArg0.VerboseLevel = 0;
    IntArg0.Seed = 1;

    IntArg0.Integral = integral_value0;
    int t = time(NULL);
    cout << "Now Attempting to create thread "<<0<<endl;
    int rc0 = 0;
    rc0 = pthread_create(&thr0, NULL, ThreadIntegrate,&IntArg0);   
    if (rc0) {
        cout << "Error:unable to create thread," << rc0 << endl;
        exit(-1);
    }
    else cout << "Thread "<<0<<" has been succesfuly created" << endl;
    pthread_join(thr0,NULL);
    cout << "Thread 0 has finished, it took " << time(NULL)-t <<" secs to finish"  << endl;
    cout << "Integral Value = "<< integral_value0[0] << "+/-" << integral_value0[1] <<endl;
    }


    ////////////////////////////////////////////////////////////////////////////////
    ///
    ///             Multiple Threads Creation 
    ///
    /////////////////////////////////////////////////////////////////////////////// 

    if(execute_multi_core){

    pthread_t threads[NTHREADS];
    double integral_value[NTHREADS][2];
    IntegratorArguments IntArgs[NTHREADS];
    int rc[NTHREADS];
    for(int i=0;i<NTHREADS;i++){
        integral_value[i][0]=0;
        integral_value[i][1]=0;
        IntArgs[i].Integrand = funct;
        IntArgs[i].NumberOfVariables = 2;
        IntArgs[i].VerboseLevel = 0;
        IntArgs[i].Seed = i;
        IntArgs[i].Integral = integral_value[i];        
    }

    int t = time(NULL);
    for(int i=0;i<NTHREADS;i++){
        cout << "Now Attempting to create thread "<<i<<endl;
        rc[i] = pthread_create(&threads[i], NULL, ThreadIntegrate,&IntArgs[i]);
        if (rc[i]) {
            cout << "Error:unable to create thread," << rc[i] << endl;
            exit(-1);
        }
        else cout << "Thread "<<i<<" has been succesfuly created" << endl;
    }
    /// Thread Waiting Phase ///
    for(int i=0;i<NTHREADS;i++) pthread_join(threads[i],NULL);
    cout << "All threads have now finished" <<endl;
    cout << "This took " << time(NULL)-t << " secs to finish" <<endl;
    cout << "Or " << (time(NULL)-t)/NTHREADS << " secs per core" <<endl;
    for(int i = 0; i < NTHREADS; i++ ) {
        cout << "Thread " << i << " has as the value for the integral" << endl;
        cout << "Integral = ";
        cout << integral_value[i][0] << " +- "; 
        cout << integral_value[i][1] << endl;
    }

    }

    ////////////////////////////////////////////////////////////////////////
    ///
    ///             Multiple Cores Execution 
    ///
    ///////////////////////////////////////////////////////////////////////


    if(execute_multi_core_2){

    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);

    pthread_t threads[NTHREADS];
    double integral_value[NTHREADS][2];
    IntegratorArguments IntArgs[NTHREADS];
    int rc[NTHREADS];
    for(int i=0;i<NTHREADS;i++){
        integral_value[i][0]=0;
        integral_value[i][1]=0;
        IntArgs[i].Integrand = funct;
        IntArgs[i].NumberOfVariables = 2;
        IntArgs[i].VerboseLevel = 0;
        IntArgs[i].Seed = i;
        IntArgs[i].Integral = integral_value[i];        
    }

    int t = time(NULL);
    for(int i=0;i<NTHREADS;i++){
        cout << "Now Attempting to create thread "<<i<<endl;
        rc[i] = pthread_create(&threads[i], NULL, ThreadIntegrate,&IntArgs[i]);
        if (rc[i]) {
            cout << "Error:unable to create thread," << rc[i] << endl;
            exit(-1);
        }
        else cout << "Thread "<<i<<" has been succesfuly created" << endl;
        CPU_SET(i, &cpuset);
    }

    cout << "Now attempting to commit different threads to different cores" << endl;
    for(int i=0;i<NTHREADS;i++){
        const int set_result = pthread_setaffinity_np(threads[i], sizeof(cpu_set_t), &cpuset);
        if(set_result) cout << "Error: Thread "<<i<<" could not be commited to a new core"<<endl;
        else cout << "Thread reassignment succesful" << endl;
    }

    /// Thread Waiting Phase ///
    for(int i=0;i<NTHREADS;i++) pthread_join(threads[i],NULL);
    cout << "All threads have now finished" <<endl;
    cout << "This took " << time(NULL)-t << " secs to finish" <<endl;
    cout << "Or " << (time(NULL)-t)/NTHREADS << " secs per core" <<endl;
    for(int i = 0; i < NTHREADS; i++ ) {
        cout << "Thread " << i << " has as the value for the integral" << endl;
        cout << "Integral = ";
        cout << integral_value[i][0] << " +- "; 
        cout << integral_value[i][1] << endl;
    }

    }



pthread_exit(NULL);

}

我编译 g++ -std=c++11 -w -fpermissive -O3 SOURCE.cpp -lpthread

在我看来,我的线程实际上是按顺序执行的,因为 时间似乎随着 NTHREADS 的增加而增长,实际上它需要大约 NTHREADS 倍的时间 而不是一个线程。

有人知道瓶颈在哪里吗?

【问题讨论】:

  • rand 不是线程安全的,但我不知道这是否是瓶颈的根源。此外,您只需为任何给定的x 调用一次f(x),而不是现在调用它的 4 或 5 次。
  • 任何共享数据在具有不同缓存的内核上运行都会变慢。
  • 所以,你有一个任务 X 工作,需要 Y 运行。而且,据我所见,这与启动每个执行 X 工作的 N 个线程相比,您想知道为什么总体上做 X*N 的工作量比 X 长,总时间比 Y 长?这是一个准确的胶囊总结吗,为什么执行 X N 次,然后执行 X 一次需要更长的时间,即使这 N 次是由 N 个线程完成的?
  • -w -fpermissive 与使用的安全标志几乎相反。它禁止对不符合 C++ 标准的危险事物进行任何诊断。例如,您的程序使用错误的返回类型 ThreadIntegrate 以便将其传递给 pthread_create,因此您的程序在标准下具有未定义的行为。它是否会工作现在取决于编译器和 pthreads 实现。还请考虑使用std::thread(显然您有可用的 C++11),它将 pthreads 包装在适当的 C++ API 中。没有充分的理由直接使用 pthread。
  • 另外,尝试注释掉线程中对cout 的写入,看看是否有任何影响。

标签: c++ multithreading


【解决方案1】:

您正在使用rand(),它是一个全局随机数生成器。首先它不是线程安全的,所以在多个线程中使用它,可能是并行的,会导致未定义的行为。

即使我们把它放在一边,rand() 正在使用一个由所有线程共享的全局实例。如果一个线程想要调用它,处理器内核需要检查其他内核是否修改了它的状态,并且每次使用时都需要从主存储器或其他缓存中重新获取该状态。这就是您观察到性能下降的原因。

使用&lt;random&gt; 工具代替伪随机数生成器。它们提供质量更好的随机数生成器、随机数分布以及创建多个独立随机数生成器实例的能力。制作这些thread_local,这样线程就不会相互干扰:

double aleatorio(double a, double b){
    thread_local std::mt19937 rng{/*seed*/};
    return std::uniform_real_distribution<double>{a, b}(rng);
}

请注意,虽然这没有为std::mt19937 使用正确的种子,但请参阅this question 了解详细信息,并且uniform_real_distribution&lt;double&gt;{a, b} 将返回aa 之间的均匀分布数字exclusive时间>。您的原始代码给出了一个介于ab 之间的数字包含(除了可能的舍入错误)。我认为两者都与您无关。

还请注意我在您的问题下不相关的 cmets,以了解您应该改进的其他方面。

【讨论】:

  • 谢谢@walnut,这解决了我的问题,帖子有错字,第二行应该是:thread_local std::mt19937 rng{std::random_device{}()};
  • -fpermissive 是我 100% 的懒惰,通过更改 ThreadIntegrate 以返回一个 void 指针(并实际返回它)我可以删除该标志,谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-12-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-08-13
  • 2011-09-18
  • 2021-11-01
相关资源
最近更新 更多