【发布时间】:2014-03-11 21:45:14
【问题描述】:
我正在尝试使用 pthreads 做一个相当简单的搜索,我将一个文本文件分成 5 列,每个 5 个线程执行搜索。发生这种情况后,将发生合并以确定输入是否与文本文件中的任何单个条目匹配。但是,当我运行它时,它似乎挂起。我想我有各种各样的竞态条件,但是使用 DDD 进行调试还没有产生任何结果。
我认为我正在正确地创建和加入 pthread,但我不确定我是否正确使用互斥锁。
搜索功能在给定列中创建唯一值的链接列表,然后创建一个位掩码来保存找到该值的索引。无论如何,如果我运行标准线性搜索,我的测试数据会比线程执行多 7 个结果,我认为这意味着搜索或合并线程中出现了问题。将批量大小从我的数据集的 10% 块一次减少到 1 或 2 个值会增加匹配的数量,但我仍然错过了几个正确的总数。
如果有人知道程序在完成之前为何挂起(可能是由于搜索或合并线程),我将不胜感激。
为了简洁起见,我省略了大约一半的代码,但如果有帮助,我可以发布更多。
void * search(void* pthread_arg){
search_thread_param * myParams = (search_thread_param*) pthread_arg;
int k=0; // how many searches have been done so far
int m;
while (k<NUM_PACKETS){
pthread_mutex_lock(&mutex);
if(n !=0){
pthread_cond_wait(&search_cv, &mutex); // mege is in progress, go to sleep
}
pthread_mutex_unlock(&mutex); // release th lock so the other search threads can acquire it
for (m =0; m < BATCH; m++)
myParams->buffer[m] = mySearchFunction(myParams->packet_field[k+m], myParams->rule_field, unique_no[myParams->thread_id]); // given a value, return its BV
k=k+m;
pthread_mutex_lock(&mutex);
if (n==NUM_FIELDS-1) // this search thread is the only active one
{
n++;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&merge_cv); // wake up merge thread
}
else{ // not the last one goes to sleep
n++;
pthread_mutex_unlock(&mutex);
}
}
return NULL;
}
void merge_function(unsigned long b0,unsigned long b1,unsigned long b2,unsigned long b3,unsigned long b4){
unsigned long final_BV;
final_BV = b0 & b1 & b2 & b3 & b4;
if( final_BV )
int counter = 0;
while(final_BV){
counter++;
final_BV = final_BV >> 1; //bit shift
}
if( counter)
printf("matched rule %i\n", counter);
}
void * merge (void* pthread_arg){
merge_thread_param * myParams = (merge_thread_param*) pthread_arg;
int k=0; // how many merges have been done so far
int m;
pthread_mutex_lock(&mutex);
if (n<NUM_FIELDS) // the first batch is not ready
pthread_cond_wait(&merge_cv, &mutex); // go to sleep
while (k<NUM_PACKETS){
for (m =0; m < BATCH; m++){
merge_function(myParams->buffer[0][m],myParams->buffer[1][m],myParams->buffer[2][m],myParams->buffer[3][m],myParams->buffer[4][m]);
}
printf("Merge thread: finish merging one batch\n");
k=k+m;
n = 0; //ready to wake up all search threads
if(k ==NUM_PACKETS){ // the program is over
pthread_mutex_unlock(&mutex);
printf("Merge thread: I am done, found %i results, compared to %i linear search n = %i\n",res,lin,n);
return NULL;
}
else {
pthread_cond_broadcast(&search_cv);
pthread_mutex_unlock(&mutex);
pthread_cond_wait(&merge_cv, &mutex); // go to sleep
}
}
return NULL;
}
int main () {
///////setup info omitted for brevity
// create search threads
for (i=0; i< NUM_FIELDS; i++)
if(pthread_create(&s_thread[i], &attr, search, (void*) &stp[i])!=0){printf("Creating Thread failed!\n");}
// create merge thread
if(pthread_create(&m_thread, &attr, merge, (void*) &mtp)!=0){printf("Creating Thread failed!\n");}
// join the threads
for (i=0; i<NUM_FIELDS; i++)
if (pthread_join(s_thread[i], NULL)==0){printf("joined thread %i\n",i);}
else{printf("Joining thread %i failed!\n",i);}
if (pthread_join(m_thread, NULL)!=0){printf("joined merge thread");}
else{printf("Joining thread failed!\n");}
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&search_cv);
pthread_cond_destroy(&merge_cv);
//free dat memory
printf("finished joining all the threads");
fflush(stdout);
return 0;
}
【问题讨论】: