【问题标题】:read same structure on all threads在所有线程上读取相同的结构
【发布时间】:2013-09-13 15:22:16
【问题描述】:

我希望所有线程都从同一个结构中读取。我过去是通过在从结构中读取的循环中添加线程来实现的,但这次我需要在 void "dowork" 中打开结构,如我的示例所示。

我有以下代码:

struct word_list {
    char word[20];
    struct word_list * next;
};

struct word_list * first_word = NULL;
//other function which loads into struct is missing cause it's not relevant


//in main()
pthread_t thread_id[MAX_THREADS];
int max_thread = 10;
for(t = 0 ; t < max_thread; t++)
{
    pthread_mutex_lock(&thrd_list);
    arg_struct *args = calloc(1, sizeof(*args));
    args->file = file;
    args->t = t;
    args->e = ex;
    pthread_mutex_unlock(&thrd_list);
    if(pthread_create(&thread_id[t],NULL,dowork,args) != 0)
    {
        t--;
        fprintf(stderr,RED "\nError in creating thread\n" NONE);
    }
}

for(t = 0 ; t < max_thread; t++)
    if(pthread_join(thread_id[t],NULL) != 0)
    {
        fprintf(stderr,RED "\nError in joining thread\n" NONE);
    }




void *dowork(void *arguments)
{
    struct word_list * curr_word = first_word;
    char myword[20];
    while( curr_word != NULL )
    {
        pthread_mutex_lock(&thrd_list);
        strncpy(myword,curr_word->word,sizeof(myword) - 1);
        pthread_mutex_unlock(&thrd_list);

        //some irrelevant code is missing

        pthread_mutex_lock(&thrd_list);
        curr_word = curr_word->next;
        pthread_mutex_unlock(&thrd_list);
    }

}

如何在所有线程中从同一结构中读取不同的元素?

【问题讨论】:

  • 您要究竟做什么?每个线程都有一个完整的链表吗?每个线程是否处理列表中的一个节点?正如所写的那样,您有一个奇怪的保护循环设置,因为很容易想象一个或多个线程将只处理部分节点列表,因为它们在 main() 可以附加下一个节点之前读取了一个空的 next-ptr。
  • 我需要每个线程只处理列表中的一个节点,所有线程都处理同一个列表
  • 我删除了很多代码。在原来的没有空下一个错误,有一个条件
  • 如果每个线程只处理一个节点,那么线程proc中while循环的目的是什么?待办事项正是线程参数的原因,您是否有某些特定原因反对使用它?
  • 其实是有原因的。每个线程都使用一个套接字,并且在每个套接字上我必须进行相同的操作,但使用来自同一结构的不同节点

标签: c multithreading struct pthreads structure


【解决方案1】:

如果我现在了解您的要求(我想我终于明白了),您需要将您的单词列表视为工作队列。要做到这一点,需要一种通知机制,该机制允许将项目“推送”到队列中,以通知“拉取者”新数据可用。这样的系统确实存在于 pthread 中:条件变量互斥体和它们为控制流管理的谓词的结合.

这是一个如何使用它的例子。我已尝试为您记录每个步骤中发生的情况,希望您能理解。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

// defined the number of threads in our queue and the number
//  of test items for this demonstration.
#define MAX_THREADS  16
#define MAX_ITEMS    128*1024

typedef struct word_list
{
    char word[20];
    struct word_list * next;

} word_list;

// predicate values for the word list
struct word_list * first_word = NULL;   // current word.
int word_shutdown = 0;                  // shutdown state

// used for protecting our list.
pthread_mutex_t wq_mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wq_cv = PTHREAD_COND_INITIALIZER;

// worker proc
void *dowork(void*);

int main()
{
    pthread_t thread_id[MAX_THREADS];
    int i=0;

    // start thread pool
    for(i=0; i < MAX_THREADS; ++i)
        pthread_create(thread_id+i, NULL, dowork, NULL);

    // add MAX_ITEMS more entries, we need to latch since the
    //  work threads are actively processing the queue as we go.
    for (i=0;i<MAX_ITEMS;++i)
    {
        word_list *node = malloc(sizeof(*node));
        sprintf(node->word, "Word-%d", i);

        // latch before updating the queue head.
        pthread_mutex_lock(&wq_mtx);
        node->next = first_word;
        first_word = node;

        // no longer need the latch. unlock and inform any
        // potential waiter.
        pthread_mutex_unlock(&wq_mtx);
        pthread_cond_signal(&wq_cv);
    }

    // wait for the condition that the queue is empty
    pthread_mutex_lock(&wq_mtx);
    while (first_word != NULL)
        pthread_cond_wait(&wq_cv, &wq_mtx);
    pthread_mutex_unlock(&wq_mtx);

    // queue is empty, but threads are all still there waiting. So
    //  do it again, just to proves the pool is still intact.
    for (i=0;i<MAX_ITEMS;++i)
    {
        word_list *node = malloc(sizeof(*node));
        sprintf(node->word, "Word-%d", i);

        // latch before updating the queue head.
        pthread_mutex_lock(&wq_mtx);
        node->next = first_word;
        first_word = node;

        // no longer need the latch. unlock and inform any
        // potential waiter.
        pthread_mutex_unlock(&wq_mtx);
        pthread_cond_signal(&wq_cv);
    }

    // again wait for the condition that the queue is empty
    pthread_mutex_lock(&wq_mtx);
    while (first_word != NULL)
        pthread_cond_wait(&wq_cv, &wq_mtx);

    // queue is empty, and we're not adding anything else. latch
    //  the mutex, set the shutdown flag, and tell all the threads.
    //  they need to terminate.
    word_shutdown = 1;
    pthread_mutex_unlock(&wq_mtx);
    pthread_cond_broadcast(&wq_cv);

    for (i=0;i<MAX_THREADS; ++i)
        pthread_join(thread_id[i], NULL);

    return EXIT_SUCCESS;
}


// the work crew will start by locking the mutex, then entering the
//  work loop, looking for entries or a shutdown state
void *dowork(void *arguments)
{
    int n_processed = 0;
    while (1)
    {
        pthread_mutex_lock(&wq_mtx);
        while (first_word == NULL && word_shutdown == 0)
            pthread_cond_wait(&wq_cv, &wq_mtx);

        // we own the mutex, and thus current access to the predicate
        //  values it protects.
        if (first_word != NULL)
        {
            // pull the item off the queue. once we do that we own the
            //  item, so we can unlatch and let another waiter know there
            //  may be more data on the queue.
            word_list *p = first_word;
            first_word = p->next;
            if (p->next)
                pthread_cond_signal(&wq_cv);
            pthread_mutex_unlock(&wq_mtx);

            //
            // TODO: process item here.
            //
            ++n_processed;
            free(p);
        }
        else if (word_shutdown != 0)
            break;
    }

    // we still own the mutex. report on how many items we received, then
    //  one more signal to let someone (anyone, actually) know we're done.
    pthread_t self = pthread_self();
    printf("%p : processed %d items.\n",self, n_processed);
    pthread_mutex_unlock(&wq_mtx);
    pthread_cond_signal(&wq_cv);
    return NULL;
}

示例输出:MAX_THREADS = 4(您的输出会有所不同)

0x100387000 : processed 64909 items.
0x100304000 : processed 64966 items.
0x1000b5000 : processed 64275 items.
0x100281000 : processed 67994 items.

示例输出:MAX_THREADS = 8

0x100304000 : processed 31595 items.
0x1000b5000 : processed 33663 items.
0x100593000 : processed 34298 items.
0x10040a000 : processed 32304 items.
0x10048d000 : processed 32406 items.
0x100387000 : processed 31878 items.
0x100281000 : processed 32317 items.
0x100510000 : processed 33683 items.

样本输出:MAX_THREADS = 16

0x10079f000 : processed 17239 items.
0x101081000 : processed 16530 items.
0x101104000 : processed 16662 items.
0x100699000 : processed 16562 items.
0x10040a000 : processed 16672 items.
0x100593000 : processed 15158 items.
0x10120a000 : processed 17365 items.
0x101187000 : processed 14184 items.
0x100387000 : processed 16332 items.
0x100616000 : processed 16497 items.
0x100281000 : processed 16632 items.
0x100304000 : processed 16222 items.
0x100510000 : processed 17188 items.
0x10048d000 : processed 15367 items.
0x1000b5000 : processed 16912 items.
0x10071c000 : processed 16622 items.

因为我们可以,启用完整的全局优化

示例输出:MAX_THREADS = 32,MAX_ITEMS = 4194304

0x109c58000 : processed 260000 items.
0x109634000 : processed 263433 items.
0x10973a000 : processed 262125 items.
0x10921c000 : processed 261201 items.
0x108d81000 : processed 262325 items.
0x109a4c000 : processed 262318 items.
0x108f8d000 : processed 263107 items.
0x109010000 : processed 261382 items.
0x109946000 : processed 262299 items.
0x109199000 : processed 261930 items.
0x10929f000 : processed 263506 items.
0x109093000 : processed 262362 items.
0x108e87000 : processed 262069 items.
0x108e04000 : processed 261890 items.
0x109acf000 : processed 261875 items.
0x1097bd000 : processed 262040 items.
0x109840000 : processed 261686 items.
0x1093a5000 : processed 262547 items.
0x109b52000 : processed 261980 items.
0x109428000 : processed 264259 items.
0x108f0a000 : processed 261620 items.
0x1095b1000 : processed 263062 items.
0x1094ab000 : processed 261811 items.
0x1099c9000 : processed 262709 items.
0x109116000 : processed 261628 items.
0x109bd5000 : processed 260905 items.
0x10952e000 : processed 262741 items.
0x1098c3000 : processed 260608 items.
0x109322000 : processed 261970 items.
0x1000b8000 : processed 262061 items.
0x100781000 : processed 262669 items.
0x1096b7000 : processed 262490 items.

嗯。我没有在其中任何一个中使用volatile。一定是时候买彩票了。

无论如何,我建议对 pthread 进行一些研究,特别是互斥体和条件变量控制及其交互。希望对您有所帮助。

【讨论】:

  • 非常感谢!我会在我回家后的几个小时内尝试一下。我会发布更新
  • +1 表示厚脸皮“我没有在任何这些中使用volatile”评论(当然还有答案;-)
  • @WhozCraig:我还有一个问题.. 如何影响您的示例,因为我不知道结构有多大,并且我希望记录按线程数分割而不是按 MAX_ITEMS ?
  • @SamReina 它们已经(相对)均匀地分布在线程之间。您在上面看到的计数应该可以证明这一点。您看到的输出是在我关闭队列之前线程在其生命周期内处理的项目数。关于不知道项目的数量,这完全取决于你。上面的示例必须在某个地方停止,但只要线程池不断提取项目,您的馈线端就没有理由不能无限期地推送项目。也许我不明白你的问题的上下文。
  • 不,你做得很好,再次感谢你的回答。我只需要弄清楚如何为我的代码实现您的解决方案。这有点难,尤其是我还在学习,你的一些代码对我来说有点科幻:)
【解决方案2】:

因此,您希望通过将工作拆分到多个线程来处理大量数据。您的解决方案效率不高,因为您的线程将与许多拥有互斥锁的人进行斗争,并且您无法确定工作是否均匀分布在所有线程中。因此,例如,线程 0 和 1 在首次访问互斥锁时可能正在完成所有工作,而所有其他线程一直处于空闲状态。

如果你想提高性能,你需要做到以下几点:-

  • 使所有线程相互独立,即删除同步数据
  • 确保线程之间的内存一致性,即确保项目 n+1 的数据紧邻项目 n 的数据。这有助于 CPU 更好地访问内存。频繁地在 RAM 中跳跃会产生大量缓存未命中,从而影响性能。

因此,在您的程序中,不是所有线程共享的单个链表,而是为每个线程创建一个链表:-

typedef struct _word_list
{
  //data
  struct _word_list *next;
} word_list;

static const int num_threads = 4; // actually, setting this to number of CPUs at run time would be better

word_list 
  *lists [num_threads] = {0};

void ReadWords ()
{
  word_list
    **current [num_threads];

  for (int i = 0 ; i < num_threads ; ++i)
  {
    current = &lists [i];
  }

  int destination = 0;

  while (read some valid input)
  {
    *current [destination] = malloc (sizeof (word_list));
    // set data 
    current [destination] = &current [destination]->next;

    destination = (destination + 1) % num_threads;
  }

  // data has now been read and stored into a series of linked lists, each list having
  // the same number of items (or one less)
}


void create_threads ()
{
   for (int i = 0 ; i < num_threads ; ++i)
   {
      // create thread, and pass it the value of lists [i]
   }
}

void do_work (...)
{
   for (word_list *item = passed in parameter ; item ; item = item->next)
   {
     process data
   }
}

在这个程序中(只是编的,没查过)我创建了四个链表,然后将数据均匀地分配给链表。然后我创建线程并给每个线程一个链表。然后每个线程处理自己的链表(它们是单独的链表)。

现在每个线程都可以全速运行,而无需等待互斥体来获取数据。内存访问是合理的,但在很大程度上取决于分配器。使用数组而不是链表可以改善这一点,但您需要在分配数组之前知道数据项的数量,这可能是不可能的。

【讨论】:

  • 数组比结构快吗?我应该更改当前存储的项目吗?
  • 我正在从文件中加载项目,以便在存储时对它们进行计数。数组是我的问题的解决方案吗?我喜欢你发布的想法,你的论点非常扎实。谢谢
  • @SamReina:数组提供了良好的内存使用率,但在创建数组之前您需要知道有多少项。如果您在开始读取文件之前就知道项目的数量,那么数组将比链表更好地工作。你可以有一个数组,并给每个线程一个开始索引和项目计数。
【解决方案3】:

让我看看我是否理解正确?

  • struct word_list 描述了某种链表
  • 您希望将该列表中的元素分散到线程中。

如果这是你想要的,那么我会从列表中一个一个地弹出元素并将指向其余元素的指针写回:

volatile struct word_list * first_word = NULL; // important to make it volatile

void *dowork(void *arguments)
{
    struct word_list * curr_word;
    char myword[20];
    do {
        // gain exclusive access to the control structures
        pthread_mutex_lock(&thrd_list);

        // get the next element
        curr_word = first_word;
        if (curr_word == NULL) {
            pthread_mutex_unlock(&thrd_list);
            break;
        }

        // notify the remaining threads what the next element is
        first_word = curr_word->next;

        pthread_mutex_unlock(&thrd_list);

        // do whatever you have to do

    } while (1);

}

如果您不想修改first_word,请创建一个额外的全局volatile struct word_list * next_word。确保将其设为volatile,否则编译器可能会执行导致奇怪结果的优化。

【讨论】:

  • volatile 不是线程同步的灵丹妙药; 同步对象,正确使用,是该武器的正确弹药。
  • @WhozCraig 不确定你在暗示什么。但答案是正确的。它正在正确使用互斥锁,并在需要的地方建议易失性。您是否暗示正确使用同步对象会使使用 volatile 变得不必要?
  • @Ziffusion 这正是我的意思。在 SO 上的volatile 上有很多问题和答案,所以我不会再跑到那个兔子洞里了。这不是它的预期用途。虽然是一个与 C++ 相关的问题,this is one of my more favorite links,因为它有 许多 个链接和讨论。
  • 我找不到此示例需要 volatile 的位置。
  • 你的代码中出现了死锁:当 break 被执行时,互斥锁没有被释放,所以只有第一个到达列表末尾的线程会退出,其余的将永远等待。跨度>
猜你喜欢
  • 1970-01-01
  • 2017-04-01
  • 2015-11-30
  • 2016-04-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多