【问题标题】:pthread_join - multiple threads waitingpthread_join - 多个线程等待
【发布时间】:2009-01-10 15:06:29
【问题描述】:

使用 POSIX 线程和 C++,我有一个“插入操作”,一次只能安全地完成一个。

如果我有多个线程等待使用 pthread_join 插入,则生成一个新线程 当它完成时。它们是否都会同时收到“线程完成”信号并产生多个插入,或者假设首先收到“线程完成”信号的线程将产生一个新线程,阻止其他线程创建新线程。

/* --- GLOBAL --- */
pthread_t insertThread;



/* --- DIFFERENT THREADS --- */
// Wait for Current insert to finish
pthread_join(insertThread, NULL); 

// Done start a new one
pthread_create(&insertThread, NULL, Insert, Data);

感谢您的回复

该程序基本上是一个巨大的哈希表,它通过套接字接收来自客户端的请求。

每个新的客户端连接都会产生一个新线程,然后它可以从中执行多个操作,特别是查找或插入。查找可以并行进行。但是插入需要“重新组合”到一个线程中。您可以说查找操作可以在不为客户端生成新线程的情况下完成,但是它们可能需要一段时间导致服务器锁定,从而丢弃新请求。该设计试图尽可能减少系统调用和线程创建。

但现在我知道我最初认为我应该能够拼凑一些东西的方式并不安全

谢谢

【问题讨论】:

    标签: c++ multithreading posix


    【解决方案1】:

    来自opengroup.org on pthread_join

    多个同时调用 pthread_join() 指定同一目标线程的结果未定义。

    所以,你真的不应该有多个线程加入你之前的 insertThread。

    首先,当你使用 C++ 时,我推荐boost.thread。它们类似于线程的 POSIX 模型,并且也适用于 Windows。它还可以帮助您使用 C++,即让函数对象更容易使用。

    第二,你为什么要启动一个新线程来插入元素,而你总是要等待前一个线程完成才能开始下一个线程?似乎不是多线程的经典用法。

    虽然... 一个经典的解决方案是让一个工作线程从事件队列中获取作业,而其他线程将操作发布到事件队列上。

    如果你真的只是想或多或少地保持现在的样子,你必须这样做:

    • 创建一个条件变量,例如insert_finished
    • 所有想要插入的线程,等待条件变量。
    • 一旦一个线程完成插入,它就会触发条件变量。
    • 由于条件变量需要互斥体,您可以通知所有个等待线程,它们都想开始插入,但由于一次只有一个线程可以获取互斥体,所以所有线程都会执行按顺序插入。

    但是您应该注意不要以过于临时的方式实现同​​步。由于这称为insert,我怀疑您想要操作数据结构,因此您可能希望首先实现线程安全的数据结构,而不是共享数据结构访问和所有客户端之间的同步。我还怀疑除了insert之外会有更多的操作,这需要适当的同步......

    【讨论】:

      【解决方案2】:

      根据 Single Unix Specifcation:“多个同时调用 pthread_join() 指定同一目标线程的结果是未定义的。”

      实现单线程获取任务的“正常方式”是设置一个条件变量(不要忘记相关的互斥锁):空闲线程在 pthread_cond_wait()(或 pthread_cond_timedwait())中等待,并且当完成工作的线程完成时,它会使用 pthread_cond_signal() 唤醒空闲线程之一。

      【讨论】:

        【解决方案3】:

        是的,正如大多数人推荐的那样,最好的方法似乎是让工作线程从队列中读取。下面是一些代码sn-ps

            pthread_t       insertThread = NULL;
            pthread_mutex_t insertConditionNewMutex = PTHREAD_MUTEX_INITIALIZER;
            pthread_mutex_t insertConditionDoneMutex    = PTHREAD_MUTEX_INITIALIZER;
            pthread_cond_t  insertConditionNew      = PTHREAD_COND_INITIALIZER;
            pthread_cond_t  insertConditionDone     = PTHREAD_COND_INITIALIZER;
        
               //Thread for new incoming connection
                void * newBatchInsert()
                {
                   for(each Word)
                   {
                                    //Push It into the queue
                                    pthread_mutex_lock(&lexicon[newPendingWord->length - 1]->insertQueueMutex);
                                        lexicon[newPendingWord->length - 1]->insertQueue.push(newPendingWord);
                                    pthread_mutex_unlock(&lexicon[newPendingWord->length - 1]->insertQueueMutex);
        
                   }
        
                            //Send signal to worker Thread
                            pthread_mutex_lock(&insertConditionNewMutex);
                                pthread_cond_signal(&insertConditionNew);
                            pthread_mutex_unlock(&insertConditionNewMutex);
        
                            //Wait Until it's finished
                            pthread_cond_wait(&insertConditionDone, &insertConditionDoneMutex);
        
                }
        
        
                    //Worker thread
                    void * insertWorker(void *)
                    {
        
                        while(1)        
                        {
        
                            pthread_cond_wait(&insertConditionNew, &insertConditionNewMutex);
        
                            for (int ii = 0; ii < maxWordLength; ++ii)
                            {                   
                                    while (!lexicon[ii]->insertQueue.empty())
                                    {
        
                                        queueNode * newPendingWord = lexicon[ii]->insertQueue.front();
        
        
                                        lexicon[ii]->insert(newPendingWord->word);
        
                                        pthread_mutex_lock(&lexicon[ii]->insertQueueMutex);
                                        lexicon[ii]->insertQueue.pop();
                                        pthread_mutex_unlock(&lexicon[ii]->insertQueueMutex);
        
                                    }
        
                            }
        
                            //Send signal that it's done
                            pthread_mutex_lock(&insertConditionDoneMutex);
                                pthread_cond_broadcast(&insertConditionDone);
                            pthread_mutex_unlock(&insertConditionDoneMutex);
        
                        }
        
                    }
        
                    int main(int argc, char * const argv[]) 
                    {
        
                        pthread_create(&insertThread, NULL, &insertWorker, NULL);
        
        
                        lexiconServer = new server(serverPort, (void *) newBatchInsert);
        
                        return 0;
                    }
        

        【讨论】:

          【解决方案4】:

          其他人已经指出这具有未定义的行为。我只想补充一点,完成任务的最简单的方法(只允许一个线程执行部分代码)是使用一个简单的互斥锁 - 你需要执行该代码的线程是互斥的,这就是互斥锁出现的地方它的名字:-)

          如果您需要在特定线程中运行代码(如 Java AWT),那么您需要条件变量。但是,您应该三思而后行,这种解决方案是否真的有回报。想象一下,如果您每秒调用 10000 次“插入操作”,您需要多少次上下文切换。

          【讨论】:

            【解决方案5】:

            正如您刚才提到的,您使用的哈希表具有与插入并行的多个查找,我建议您检查是否可以使用并发哈希表。

            当您同时插入元素时,由于确切的查找结果是不确定的,因此这样的并发散列映射可能正是您所需要的。不过,我没有在 C++ 中使用并发哈希表,但由于它们在 Java 中可用,您肯定会找到一个在 C++ 中执行此操作的库。

            【讨论】:

              【解决方案6】:

              我发现的唯一一个支持插入而不锁定新查找的库 - Sunrise DD(我不确定它是否支持并发插入)

              但是,从 Google 的 Sparse Hash map 切换后,内存使用量增加了一倍多。查找应该很少发生,而不是尝试编写我自己的库 它结合了两者的优点,我宁愿在安全地进行更改时锁定表暂停查找。

              再次感谢

              【讨论】:

                【解决方案7】:

                在我看来,您想将插入序列化到哈希表中。

                为此,您需要一个锁 - 不产生新线程。

                【讨论】:

                  【解决方案8】:

                  根据您的描述,您每次想要插入某些东西时都在重新创建插入线程,因此看起来效率很低。创建线程的成本不是0。

                  这个问题的一个更常见的解决方案是产生一个在队列中等待的插入线程(即,当循环为空时,它处于一个休眠的循环中)。然后其他线程将工作项添加到队列中。插入线程按照添加顺序(或根据需要按优先级)选择队列中的项目并执行相应的操作。

                  您所要做的就是确保对队列的添加受到保护,以便一次只有一个线程可以访问修改实际队列,并且插入线程不会进行忙碌等待,而是在没有任何内容时休眠在队列中(见条件变量)。

                  【讨论】:

                    【解决方案9】:

                    理想情况下,您不希望单个进程中有多个线程池,即使它们执行不同的操作。线程的可重用性是一个重要的架构定义,如果使用 C,它会导致在主线程中创建 pthread_join。

                    当然,对于 C++ 线程池(又名 ThreadFactory),其想法是保持线程原语抽象,因此它可以处理传递给它的任何函数/操作类型。

                    一个典型的例子是一个网络服务器,它具有连接池和线程池,它们为连接提供服务,然后进一步处理它们,但是,它们都来自一个通用的线程池进程。

                    总结:避免在主线程以外的任何地方使用 PTHREAD_JOIN。

                    【讨论】:

                      猜你喜欢
                      • 1970-01-01
                      • 1970-01-01
                      • 1970-01-01
                      • 2015-08-23
                      • 1970-01-01
                      • 2023-03-11
                      • 2021-05-05
                      • 1970-01-01
                      • 2017-10-27
                      相关资源
                      最近更新 更多