【问题标题】:C++ std::thread stopping condition for thread pool线程池的 C++ std::thread 停止条件
【发布时间】:2012-08-16 13:06:10
【问题描述】:

我正在编写一个程序,它利用线程池在指定扩展名的文件中搜索与正则表达式匹配的文件。

我的线程池如下所示:

for( int i = 0; i < _nThreads; ++i )
    {
            _threads.push_back( thread( &ThreadPool::GrepFunc, this ) );
    }

运行函数如下所示:

void ThreadPool::GrepFunc()
{
    // implement a barrier

while( !_done )
{
    while( !_tasks.empty() )
    {
        fs::path task;
        bool gotTask = false;
        {
            lock_guard<mutex> tl( _taskMutex );
            if( !_tasks.empty() )
            {
                task = _tasks.front();
                _tasks.pop();
                gotTask = true;
            }
        }

        if( gotTask )
        {
            if( std::tr2::sys::is_directory( task ) )
            {
                for( fs::directory_iterator dirIter( task ), endIter; dirIter != endIter; ++dirIter )
                {
                    if( fs::is_directory( dirIter->path() ) )
                    {
                        { lock_guard<mutex> tl( _taskMutex );
                        _tasks.push( dirIter->path() ); }
                    }
                    else
                    {
                        for( auto& e : _args.extensions() )
                        {
                            if( !dirIter->path().extension().compare( e ) )
                            {
                                SearchFile( dirIter->path() );
                            }
                        }
                    }
                }
            }
            else
            {
                for( auto& e : _args.extensions() )
                {
                    if( !task.extension().compare( e ) )
                    {
                        SearchFile( task );
                    }
                }
            }
        }
    }
}
}

本质上,程序从用户那里接收一个初始目录,并将递归地搜索它和所有子目录以查找与扩展名匹配的文件,以寻找正则表达式匹配。我无法弄清楚如何确定何时达到 _done 的停止情况。我需要确保初始目录中的所有目录和文件都已被扫描,并且在我重新加入线程之前,_tasks 中的所有项目都已完成。任何想法都会非常感激。

【问题讨论】:

    标签: c++ multithreading


    【解决方案1】:

    我建议使用一个线程(可能是产生文件处理线程的同一线程)专门用于递归文件系统搜索匹配文件;它可以将文件添加到工作队列中,文件搜索线程可以从中获取工作。您可以使用条件变量来协调这一点。

    如您所见,协调关机有点棘手。文件系统搜索线程完成搜索后,它可以设置一些对工作线程可见的“刚刚完成队列中的内容”标志,然后通知它们全部唤醒并尝试处理另一个文件:如果他们发现文件/工作队列为空他们退出。然后文件系统搜索线程加入所有工作人员。

    【讨论】:

    • 我知道这是可行的,因为这实际上是我在我的程序的第一个版本中实现的方式。我只是有一个使用 recursive_directory_iterator 的循环,它进行扫描并传入与工作线程扩展匹配的文件。然而,通过对较大目录的测试,我发现大部分运行时间实际上都花在了递归搜索上,所以我一直在尝试对搜索本身进行线程化,并练习线程化和优化。如果我找不到解决方案,我会回到那个位置,但我真的希望找到一种方法来完成这项工作。
    • 好吧,我建议使用单个文件系统搜索线程的原因是,如果使用多个线程,您可能会发现磁盘磁头跳来跳去并最终导致性能下降,但这取决于您使用的磁盘技术使用:重度 RAID 磁盘将具有更好的并发性,SSD 更好的寻道时间。所以是的 - 你可以为子目录创建另一个队列来搜索......另一个工作线程池来扫描这些子目录并将子子目录添加到队列中。
    【解决方案2】:

    关于您在 Tony 回答的评论中更新的问题,我建议有两种任务:一种用于递归探索子目录,另一种用于 grep。您需要 SynQueue&lt;TaskBase&gt;TaskSubDir: TaskBaseTaskGrep: TaskBaseTaskBase 有一个虚拟接口函数Run()。然后线程就可以从SynQueue重复弹出,调用TaskBase::Run()

    1. 如果它有一个TaskSubDir,那么它会找到 给定路径中的子目录和文件: (a) 如果是文件夹,将新的子目录TaskSubDir添加到SynQueue,这样文件夹就可以用线程池递归搜索; (b) 如果它是一个匹配的文件 扩展,然后它将TaskGrep 推送到SynQueue
    2. 如果得到TaskGrep,则执行SearchFile
    3. 如果队列为空,break 退出工作函数。

    这样做,您不需要有 2 个队列并在启动 grep 队列之前等待子目录队列完成。

    所以回答你的问题:要确定加入条件,你需要做的就是等待所有线程到break 退出工作函数。

    最后说明:代码中的第一个 _tasks.empty() 不受互斥体保护,可能会受到竞速条件的影响。我建议您将 mutex 和 cond_var 隐藏在 SynQueue 类中,并添加 SynQueue::empty() 成员函数(受 mutex 保护)。如果您关心效率,您可能需要考虑 无锁队列 来替换 SynQueue

    【讨论】:

    • 我对你的回答有点困惑,因为我不太明白它与我的不同。就目前而言,我没有单独的队列。所有任务都在同一个队列中,因为它们是路径对象,我可以通过检查它们是否是目录来简单地处理它们,因此似乎不需要该级别的类抽象。另外,如果我使用队列空条件来打破我如何保证程序实际上已经完成搜索?时间可能会确定检查已完成,而没有任务存在,但即将添加一个任务。
    • @JesseCarter,主要区别在于您的工作函数在一个大的if (gotTask) 子句中处理子文件夹和文件;而我把它分解成更小的任务。这样做可以有效解决你提到的问题:有的线程脱离了worker函数,而实际上并不是所有的任务都被处理完。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-04-24
    • 1970-01-01
    • 2013-04-29
    • 2021-11-24
    • 1970-01-01
    相关资源
    最近更新 更多