【问题标题】:Race condition between terminating worker threads and main thread终止工作线程和主线程之间的竞争条件
【发布时间】:2013-06-13 00:44:12
【问题描述】:

我在从主线程终止工作线程时遇到问题。到目前为止,我尝试的每种方法都会导致竞争条件或死锁。

工作线程存储在一个名为 ThreadPool 的类中的内部类中,ThreadPool 使用 unique_ptr 维护这些 WorkerThread 的向量。

这是我的线程池的标题:

class ThreadPool
{
public:
typedef void (*pFunc)(const wpath&, const Args&, Global::mFile_t&, std::mutex&, std::mutex&);       // function to point to
private:

    class WorkerThread
    {
    private:
        ThreadPool* const _thisPool;        // reference enclosing class

        // pointers to arguments
        wpath _pPath;               // member argument that will be modifyable to running thread
        Args * _pArgs;
        Global::mFile_t * _pMap;

        // flags for thread management
        bool _terminate;                    // terminate thread
        bool _busy;                         // is thread busy?
        bool _isRunning;

        // thread management members

        std::mutex              _threadMtx;
        std::condition_variable _threadCond;
        std::thread             _thisThread;

        // exception ptr
        std::exception_ptr _ex;

        // private copy constructor
        WorkerThread(const WorkerThread&): _thisPool(nullptr) {}
    public:
        WorkerThread(ThreadPool&, Args&, Global::mFile_t&);
        ~WorkerThread();

        void setPath(const wpath);          // sets a new task
        void terminate();                   // calls terminate on thread
        bool busy() const;                  // returns whether thread is busy doing task
        bool isRunning() const;             // returns whether thread is still running
        void join();                        // thread join wrapper
        std::exception_ptr exception() const;

        // actual worker thread running tasks
        void thisWorkerThread();
    };

    // thread specific information
    DWORD _numProcs;                        // number of processors on system
    unsigned _numThreads;                   // number of viable threads
    std::vector<std::unique_ptr<WorkerThread>> _vThreads;   // stores thread pointers - workaround for no move constructor in WorkerThread
    pFunc _task;                            // the task threads will call

    // synchronization members
    unsigned _barrierLimit;                 // limit before barrier goes down
    std::mutex _barrierMtx;                 // mutex for barrier
    std::condition_variable _barrierCond;   // condition for barrier
    std::mutex _coutMtx;

public:
    // argument mutex
    std::mutex matchesMap_mtx;
    std::mutex coutMatch_mtx;

    ThreadPool(pFunc f);

    // wake a thread and pass it a new parameter to work on
    void callThread(const wpath&);

    // barrier synchronization
    void synchronizeStartingThreads();

    // starts and synchronizes all threads in a sleep state
    void startThreads(Args&, Global::mFile_t&);

    // terminate threads
    void terminateThreads();

private:
};

到目前为止,我遇到的真正问题是从主线程调用 terminateThreads() 导致死锁或竞争条件。

当我将 _terminate 标志设置为 true 时,主线程可能已经退出作用域并在线程有机会唤醒和终止之前销毁所有互斥锁。事实上,我已经多次遇到这种崩溃(控制台窗口显示:mutex 在忙时被破坏)

如果我在 notify_all() 线程之后添加 thread.join(),则线程有可能在连接发生之前终止,从而导致无限死锁,因为加入已终止的线程会无限期地挂起程序。

如果我分离 - 与上述相同的问题,但会导致程序崩溃

如果我改为使用 while(WorkerThread.isRunning()) Sleep(0); 程序可能会崩溃,因为主线程可能在 WorkerThread 到达最后一个右大括号之前退出。

在所有工作线程安全终止之前,我不确定还可以做什么来停止 main。此外,即使在 thread 和 main 中使用 try-catch,也不会捕获任何异常。 (我尝试过的一切都会导致程序崩溃)

在工作线程完成之前,我可以做些什么来暂停主线程?

以下是主要功能的实现:

终止单个工作线程

void ThreadPool::WorkerThread::terminate()
{
    _terminate = true;
    _threadCond.notify_all();
    _thisThread.join();
}

实际的线程循环

void ThreadPool::WorkerThread::thisWorkerThread()
{
    _thisPool->synchronizeStartingThreads();

    try
    {
        while (!_terminate)
        {
            {
                _thisPool->_coutMtx.lock();
                std::cout << std::this_thread::get_id() << " Sleeping..." << std::endl;
                _thisPool->_coutMtx.unlock();
                _busy = false;
                std::unique_lock<std::mutex> lock(_threadMtx);
                _threadCond.wait(lock);
            }
            _thisPool->_coutMtx.lock();
            std::cout << std::this_thread::get_id() << " Awake..." << std::endl;
            _thisPool->_coutMtx.unlock();
            if(_terminate)
                break;

            _thisPool->_task(_pPath, *_pArgs, *_pMap, _thisPool->coutMatch_mtx, _thisPool->matchesMap_mtx);

            _thisPool->_coutMtx.lock();
            std::cout << std::this_thread::get_id() << " Finished Task..." << std::endl;
            _thisPool->_coutMtx.unlock();

        }
        _thisPool->_coutMtx.lock();
        std::cout << std::this_thread::get_id() << " Terminating" << std::endl;
        _thisPool->_coutMtx.unlock();   
    }
    catch (const std::exception&)
    {
        _ex = std::current_exception();
    }
    _isRunning = false;
}

终止所有工作线程

void ThreadPool::terminateThreads()
{
    for (std::vector<std::unique_ptr<WorkerThread>>::iterator it = _vThreads.begin(); it != _vThreads.end(); ++it)
    {
        it->get()->terminate();
        //it->get()->_thisThread.detach();

        // if thread threw an exception, rethrow it in main
        if (it->get()->exception() != nullptr)
            std::rethrow_exception(it->get()->exception());
    }
}

最后,调用线程池的函数(扫描函数在 main 上运行)

// scans a path recursively for all files of selected extension type, calls thread to parse file
unsigned int Functions::Scan(wpath path, const Args& args, ThreadPool& pool)
{
    wrecursive_directory_iterator d(path), e;
    unsigned int filesFound = 0;
    while ( d != e )
    {
        if (args.verbose())
            std::wcout << L"Grepping: " << d->path().string() << std::endl;

        for (Args::ext_T::const_iterator it = args.extension().cbegin(); it != args.extension().cend(); ++it)
        {
            if (extension(d->path()) == *it)
            {
                ++filesFound;
                pool.callThread(d->path());
            }
        }
        ++d;
    }

    std::cout << "Scan Function: Calling TerminateThreads() " << std::endl;
    pool.terminateThreads();
    std::cout << "Scan Function: Called TerminateThreads() " << std::endl;
    return filesFound;
}

我再重复一遍这个问题:在工作线程完成之前,我可以做些什么来暂停主线程?

【问题讨论】:

  • 您不能在线程的主循环中使用 bool,并希望将其设置为 true 将在线程中观察到。需要使用 std::atomic。这里不是唯一的问题,工作线程可以被阻塞并且永远不会观察到退出请求。总的来说这是一个很难解决的问题,原因是C++11添加了std::quick_exit()

标签: c++ multithreading threadpool deadlock race-condition


【解决方案1】:

问题有两个方面:

synchronizeStartingThreads() 有时会阻塞 1 或 2 个线程,等待可以继续进行(while (some_condition) barrierCond.wait(lock) 中的问题。条件有时永远不会评估为 true。删除 while loop 修复了这个阻塞问题。

第二个问题是工作线程有可能进入_threadMtx,并且在他们进入_threadCond.wait()之前调用了notify_all,因为已经调用了notify,线程将永远等待。

即。

{
    // terminate() is called
    std::unique_lock<std::mutex> lock(_threadMtx);
    // _threadCond.notify_all() is called here
    _busy = false;
    _threadCond.wait(lock);
    // thread is blocked forever
}

令人惊讶的是,在 terminate() 中锁定这个互斥锁并没有阻止这种情况的发生。

通过向 _threadCond.wait() 添加 30 毫秒的超时来解决此问题

此外,在任务开始之前添加了一项检查,以确保不会再次处理相同的任务。

新代码现在如下所示:

这个工作线程

_threadCond.wait_for(lock, std::chrono::milliseconds(30));  // hold the lock a max of 30ms

// after the lock, and the termination check

if(_busy)
        {
            Global::mFile_t rMap = _thisPool->_task(_pPath, *_pArgs, _thisPool->coutMatch_mtx);
            _workerMap.element.insert(rMap.element.begin(), rMap.element.end());
        }

【讨论】:

    【解决方案2】:

    我不明白线程终止和加入的问题。

    加入线程就是等待给定线程终止,所以这正是你想要做的。如果线程已经执行完毕,join 将立即返回。

    因此,您只需在 terminate 调用期间加入每个线程,就像您在代码中所做的那样。

    注意:目前,如果您刚刚终止的线程具有活动的exception_ptr,您会立即重新引发任何异常。这可能会导致未连接的线程。在处理这些异常时,您必须牢记这一点

    更新:查看您的代码后,我发现了一个潜在的错误:std::condition_variable::wait() 可以在发生虚假唤醒时返回。如果是这种情况,您将在上次工作的路径上再次工作,从而导致错误的结果。如果添加了新工作,您应该为新工作设置一个标志,并且_threadCond.wait(lock) 行应该在检查标志和_terminate 的循环中。不过,不确定那是否能解决您的问题。

    【讨论】:

    • 以上代码只有 20% 的时间有效。每隔一段时间,当我在那里加入时,程序就会无限期暂停。如果我删除连接,则不会发生死锁,但 main 在线程之前终止。 join 不知何故导致了死锁。
    • 是否有可能解除锁定在您的两个或多个线程之间?他们共享变量吗?如果是这种情况,删除join 调用只是通过在主线程终止时杀死子线程来隐藏死锁。
    • 是的。所有工作线程共享 1 个主要变量 - 映射映射。我通过函数指针传递了一个互斥锁,以锁定映射中元素的插入。然而,它现在似乎什么也没做,因为地图有时仍会返回错误数据。除了点 -> 程序经过了这一点,因此此时地图已经填充并且文件已经扫描。只有当我调用 terminateThreads() 时它才会无限期挂起。所以当我的线程此时都在睡觉时,我没有看到任何死锁。
    猜你喜欢
    • 1970-01-01
    • 2016-10-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-04
    相关资源
    最近更新 更多