【发布时间】:2019-12-19 02:16:05
【问题描述】:
我正在学习使用 pthread,希望它能帮助我的一些最慢的代码片段 快一点。我尝试(作为热身示例)使用 线程。我写了一个比较三种方法的代码:
- 使用 NEVALS 被积函数评估积分的单线程 pthread 评估。
- 使用 NEVALS 对积分 NTHREADS 时间进行多线程评估 积分评估。
- 多个线程提交到我的 CPU 中的不同内核,再次总计 NEVALS*NTHREADS 积分评估。
在运行时,每个被积函数评估速度最快的是单核,比其他核快 2 到 3 倍。其他两个似乎有点等价,除了以下事实 CPU使用率非常不同,第二个将线程分布在所有(8)个内核上 在我的 CPU 中,而第三个(不出所料)将工作集中在 NTHREADS 中并留下其余部分 无人居住。
这里是来源:
#include <iostream>
#define __USE_GNU
#include <sched.h>
#include <pthread.h>
#include <thread>
#include <stdlib.h>
#include <math.h>
#include <time.h>
#include <unistd.h>
using namespace std;
double aleatorio(double a, double b){
double r = double(rand())/RAND_MAX;
return a + r * (b - a);
}
double funct(double* a){
return pow(a[0],6);
}
void EstimateBounds(int ndim, double (*f)(double*), double* bounds){
double x[ndim];
for(int i=1;i<=1000;i++){
for(int j=0;j<ndim;j++) x[j] = aleatorio(0,1);
if ( f(x) > bounds[1]) bounds[1] = f(x);
if ( f(x) < bounds[0]) bounds[0] = f(x);
}
}
void Integrate(double (*f)(double*), int ndim, double* integral, int verbose, int seed){
int nbatch = 5000000;
const int maxeval = 25*nbatch;
double x[ndim];
srand(seed);
/// Algorithm to estimate the maxima and minima ///
for(int j=0;j<ndim;j++) x[j] = 0.5;
double bounds[2] = {f(x),f(x)};
EstimateBounds(ndim,f,bounds);
/// Integral initialization ///
int niter = int(maxeval/nbatch);
for(int k=1;k<=niter;k++)
{
double loc_min = bounds[0];
double loc_max = bounds[1];
int count = 0;
for (int i=1; i<=nbatch; i++)
{
for(int j=0;j<ndim;j++) x[j] = aleatorio(0,1);
double y = aleatorio(bounds[0],bounds[1]);
if ( f(x) > loc_max ) loc_max = f(x);
if ( f(x) < loc_min ) loc_min = f(x);
if ( f(x) > y && y > 0 ) count++;
if ( f(x) < y && y < 0 ) count--;
}
double delta = (bounds[1]-bounds[0])*double(count)/nbatch;
integral[0] += delta;
integral[1] += pow(delta,2);
bounds[0] = loc_min;
bounds[1] = loc_max;
if(verbose>0){
cout << "Iteration["<<k<<"]: " << k*nbatch;
cout << " integrand evaluations so far" <<endl;
if(verbose>1){
cout << "The bounds for this iteration were = ["<<bounds[0]<<","<<bounds[1]<<"]"<<endl;}
cout << "Integral = ";
cout << integral[0]/k << " +- ";
cout << sqrt((integral[1]/k - pow(integral[0]/k,2)))/(k) << endl;
cout << endl;
}
}
integral[0] /= niter;
integral[1] = sqrt((integral[1]/niter - pow(integral[0],2)))/niter;
}
struct IntegratorArguments{
double (*Integrand)(double*);
int NumberOfVariables;
double* Integral;
int VerboseLevel;
int Seed;
};
void LayeredIntegrate(IntegratorArguments IA){
Integrate(IA.Integrand,IA.NumberOfVariables,IA.Integral,IA.VerboseLevel,IA.Seed);
}
void ThreadIntegrate(void * IntArgs){
IntegratorArguments *IA = (IntegratorArguments*)IntArgs;
LayeredIntegrate(*IA);
pthread_exit(NULL);
}
#define NTHREADS 5
int main(void)
{
cout.precision(16);
bool execute_single_core = true;
bool execute_multi_core = true;
bool execute_multi_core_2 = true;
///////////////////////////////////////////////////////////////////////////
///
/// Single Thread Execution
///
///////////////////////////////////////////////////////////////////////////
if(execute_single_core){
pthread_t thr0;
double integral_value0[2] = {0,0};
IntegratorArguments IntArg0;
IntArg0.Integrand = funct;
IntArg0.NumberOfVariables = 2;
IntArg0.VerboseLevel = 0;
IntArg0.Seed = 1;
IntArg0.Integral = integral_value0;
int t = time(NULL);
cout << "Now Attempting to create thread "<<0<<endl;
int rc0 = 0;
rc0 = pthread_create(&thr0, NULL, ThreadIntegrate,&IntArg0);
if (rc0) {
cout << "Error:unable to create thread," << rc0 << endl;
exit(-1);
}
else cout << "Thread "<<0<<" has been succesfuly created" << endl;
pthread_join(thr0,NULL);
cout << "Thread 0 has finished, it took " << time(NULL)-t <<" secs to finish" << endl;
cout << "Integral Value = "<< integral_value0[0] << "+/-" << integral_value0[1] <<endl;
}
////////////////////////////////////////////////////////////////////////////////
///
/// Multiple Threads Creation
///
///////////////////////////////////////////////////////////////////////////////
if(execute_multi_core){
pthread_t threads[NTHREADS];
double integral_value[NTHREADS][2];
IntegratorArguments IntArgs[NTHREADS];
int rc[NTHREADS];
for(int i=0;i<NTHREADS;i++){
integral_value[i][0]=0;
integral_value[i][1]=0;
IntArgs[i].Integrand = funct;
IntArgs[i].NumberOfVariables = 2;
IntArgs[i].VerboseLevel = 0;
IntArgs[i].Seed = i;
IntArgs[i].Integral = integral_value[i];
}
int t = time(NULL);
for(int i=0;i<NTHREADS;i++){
cout << "Now Attempting to create thread "<<i<<endl;
rc[i] = pthread_create(&threads[i], NULL, ThreadIntegrate,&IntArgs[i]);
if (rc[i]) {
cout << "Error:unable to create thread," << rc[i] << endl;
exit(-1);
}
else cout << "Thread "<<i<<" has been succesfuly created" << endl;
}
/// Thread Waiting Phase ///
for(int i=0;i<NTHREADS;i++) pthread_join(threads[i],NULL);
cout << "All threads have now finished" <<endl;
cout << "This took " << time(NULL)-t << " secs to finish" <<endl;
cout << "Or " << (time(NULL)-t)/NTHREADS << " secs per core" <<endl;
for(int i = 0; i < NTHREADS; i++ ) {
cout << "Thread " << i << " has as the value for the integral" << endl;
cout << "Integral = ";
cout << integral_value[i][0] << " +- ";
cout << integral_value[i][1] << endl;
}
}
////////////////////////////////////////////////////////////////////////
///
/// Multiple Cores Execution
///
///////////////////////////////////////////////////////////////////////
if(execute_multi_core_2){
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
pthread_t threads[NTHREADS];
double integral_value[NTHREADS][2];
IntegratorArguments IntArgs[NTHREADS];
int rc[NTHREADS];
for(int i=0;i<NTHREADS;i++){
integral_value[i][0]=0;
integral_value[i][1]=0;
IntArgs[i].Integrand = funct;
IntArgs[i].NumberOfVariables = 2;
IntArgs[i].VerboseLevel = 0;
IntArgs[i].Seed = i;
IntArgs[i].Integral = integral_value[i];
}
int t = time(NULL);
for(int i=0;i<NTHREADS;i++){
cout << "Now Attempting to create thread "<<i<<endl;
rc[i] = pthread_create(&threads[i], NULL, ThreadIntegrate,&IntArgs[i]);
if (rc[i]) {
cout << "Error:unable to create thread," << rc[i] << endl;
exit(-1);
}
else cout << "Thread "<<i<<" has been succesfuly created" << endl;
CPU_SET(i, &cpuset);
}
cout << "Now attempting to commit different threads to different cores" << endl;
for(int i=0;i<NTHREADS;i++){
const int set_result = pthread_setaffinity_np(threads[i], sizeof(cpu_set_t), &cpuset);
if(set_result) cout << "Error: Thread "<<i<<" could not be commited to a new core"<<endl;
else cout << "Thread reassignment succesful" << endl;
}
/// Thread Waiting Phase ///
for(int i=0;i<NTHREADS;i++) pthread_join(threads[i],NULL);
cout << "All threads have now finished" <<endl;
cout << "This took " << time(NULL)-t << " secs to finish" <<endl;
cout << "Or " << (time(NULL)-t)/NTHREADS << " secs per core" <<endl;
for(int i = 0; i < NTHREADS; i++ ) {
cout << "Thread " << i << " has as the value for the integral" << endl;
cout << "Integral = ";
cout << integral_value[i][0] << " +- ";
cout << integral_value[i][1] << endl;
}
}
pthread_exit(NULL);
}
我编译 g++ -std=c++11 -w -fpermissive -O3 SOURCE.cpp -lpthread
在我看来,我的线程实际上是按顺序执行的,因为 时间似乎随着 NTHREADS 的增加而增长,实际上它需要大约 NTHREADS 倍的时间 而不是一个线程。
有人知道瓶颈在哪里吗?
【问题讨论】:
-
rand不是线程安全的,但我不知道这是否是瓶颈的根源。此外,您只需为任何给定的x调用一次f(x),而不是现在调用它的 4 或 5 次。 -
任何共享数据在具有不同缓存的内核上运行都会变慢。
-
所以,你有一个任务 X 工作,需要 Y 运行。而且,据我所见,这与启动每个执行 X 工作的 N 个线程相比,您想知道为什么总体上做 X*N 的工作量比 X 长,总时间比 Y 长?这是一个准确的胶囊总结吗,为什么执行 X N 次,然后执行 X 一次需要更长的时间,即使这 N 次是由 N 个线程完成的?
-
-w -fpermissive与使用的安全标志几乎相反。它禁止对不符合 C++ 标准的危险事物进行任何诊断。例如,您的程序使用错误的返回类型ThreadIntegrate以便将其传递给pthread_create,因此您的程序在标准下具有未定义的行为。它是否会工作现在取决于编译器和 pthreads 实现。还请考虑使用std::thread(显然您有可用的 C++11),它将 pthreads 包装在适当的 C++ API 中。没有充分的理由直接使用 pthread。 -
另外,尝试注释掉线程中对
cout的写入,看看是否有任何影响。
标签: c++ multithreading