【问题标题】:Shutdown boost threads correctly正确关闭 boost 线程
【发布时间】:2015-06-11 09:12:05
【问题描述】:

我有 x 个同时工作的 boost 线程。一个生产者线程用计算任务填充同步队列。消费者线程弹出任务并计算它们。

图片来源:https://www.quantnet.com/threads/c-multithreading-in-boost.10028/

用户可能会在这个过程中完成程序,所以我需要正确关闭我的线程。我目前的方法似乎不起作用,因为抛出了异常。它的目的是在系统关闭时,所有进程都应该被杀死并停止他们当前的任务,无论他们做什么。你能告诉我,你将如何杀死那些线程?

线程初始化:

    for (int i = 0; i < numberOfThreads; i++)
    {
        std::thread* thread = new std::thread(&MyManager::worker, this);
        mThreads.push_back(thread);
    }

线程销毁:

void MyManager::shutdown()
{
    for (int i = 0; i < numberOfThreads; i++)
    {
        mThreads.at(i)->join();
        delete mThreads.at(i);
    }
    mThreads.clear();
}

工人:

void MyManager::worker()
{
    while (true)
    {

        int current = waitingList.pop();
        Object * p = objects.at(current);
        p->calculateMesh(); //this task is internally locked by a mutex

        try
        {
            boost::this_thread::interruption_point();
        }
        catch (const boost::thread_interrupted&)
        {
            // Thread interruption request received, break the loop
            std::cout << "- Thread interrupted. Exiting thread." << std::endl;
            break;
        }
    }
}

同步队列:

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

template <typename T>
class ThreadSafeQueue
{
public:

    T pop()
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        while (queue_.empty())
        {
            cond_.wait(mlock);
        }
        auto item = queue_.front();
        queue_.pop();

        return item;
    }

    void push(const T& item)
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        queue_.push(item);
        mlock.unlock();
        cond_.notify_one();
    }


    int sizeIndicator()
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        return queue_.size();
    }


private:

    bool isEmpty() {
        std::unique_lock<std::mutex> mlock(mutex_);
        return queue_.empty();
    }

    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable cond_;
};

抛出的错误调用栈:

... std::_Mtx_lockX(_Mtx_internal_imp_t * * _Mtx) Line 68   C++
... std::_Mutex_base::lock() Line 42    C++
... std::unique_lock<std::mutex>::unique_lock<std::mutex>(std::mutex & _Mtx) Line 220   C++
... ThreadSafeQueue<int>::pop() Line 13 C++
... MyManager::worker() Zeile 178   C++

【问题讨论】:

  • 两件事:isEmpty 没有被锁定,size() 可以有一个更简单的实现:在互斥锁被锁定后,你可以简单地返回 queue_.size() (并且 mlock 析构函数释放互斥锁)
  • @marom 谢谢,更正了我的代码。错误仍然存​​在。
  • 两件事:isEmpty 和 size 可能不公开。当调用者评估时,他们报告的任何内容都可能无效。除非它们被私下使用,否则它们将被删除。
  • 如果您只想指示正在进行的工作(例如进度条),则类似 size 的成员可能有意义
  • 一些 cmets 和答案已经存在。总结:您应该中断所有线程,然后加入所有线程,最后删除线程对象和队列。您的线程不需要中断点,因为它们在 push/pop 函数中休眠。 try 块必须包含 push/pop 调用以捕获中断的异常。

标签: c++ multithreading boost


【解决方案1】:

根据我在 Boost 和 Java 中使用线程的经验,尝试从外部关闭线程总是很麻烦。我从来没有真正让它干净地工作。

我得到的最好的结果是为所有消费者线程提供一个布尔值,该值设置为 true。当您将其设置为 false 时,线程将自行返回。在您的情况下,这可以很容易地放入您拥有的 while 循环中。

最重要的是,您将需要一些同步,以便您可以等待线程返回,然后再删除它们,否则您可能会遇到一些难以定义的行为。

我过去项目的一个例子:

线程创建

barrier = new boost::barrier(numOfThreads + 1);
threads = new detail::updater_thread*[numOfThreads];

for (unsigned int t = 0; t < numOfThreads; t++) {
    //This object is just a wrapper class for the boost thread.
    threads[t] = new detail::updater_thread(barrier, this);
}

线程销毁

for (unsigned int i = 0; i < numOfThreads; i++) {
    threads[i]->requestStop();//Notify all threads to stop.
}

barrier->wait();//The update request will allow the threads to get the message to shutdown.

for (unsigned int i = 0; i < numOfThreads; i++) {
    threads[i]->waitForStop();//Wait for all threads to stop.
    delete threads[i];//Now we are safe to clean up.
}

线程包装器中可能感兴趣的一些方法。

//Constructor
updater_thread::updater_thread(boost::barrier * barrier)
{
   this->barrier = barrier;
   running = true;

   thread = boost::thread(&updater_thread::run, this);
}

void updater_thread::run() {
    while (running) {
        barrier->wait();
        if (!running) break;

        //Do stuff

        barrier->wait();
    }
}

void updater_thread::requestStop() {
    running = false;
}

void updater_thread::waitForStop() {
    thread.join();
}

 

【讨论】:

    【解决方案2】:

    尝试将“尝试”向上移动(如下例所示)。如果您的线程正在等待数据(在 waitingList.pop() 内),那么可能在条件变量 .wait() 内等待。这是一个“中断点”,因此可能会在线程被中断时抛出。

    void MyManager::worker()
    {
        while (true)
        {
            try
            {
                int current = waitingList.pop();
                Object * p = objects.at(current);
                p->calculateMesh(); //this task is internally locked by a mutex
    
                boost::this_thread::interruption_point();
            }
            catch (const boost::thread_interrupted&)
            {
                // Thread interruption request received, break the loop
                std::cout << "- Thread interrupted. Exiting thread." << std::endl;
                break;
            }
        }
    }
    

    【讨论】:

    • 试过了,但错误仍然存​​在。我正在使用崩溃的调试调用列表更新问题。
    • 我会将整个循环包装成一个 try-catch 子句。它不会在这里改变任何东西,但例外的关键在于它们遍历所有控制流结构,如循环,因此您不必手动执行此操作。
    【解决方案3】:

    也许你捕捉到了错误的异常类? 这意味着它不会被抓住。 对线程不太熟悉,但是否是 std::threads 和 boost::threads 的混合导致了这种情况?

    尝试捕获最低的父异常。

    【讨论】:

      【解决方案4】:

      我认为这是读写器线程在公共缓冲区上工作的经典问题。解决此问题的最安全方法之一是使用互斥锁和信号。(我无法在此处发布代码。请给我发电子邮件,我将代码发布给您)。

      【讨论】:

        猜你喜欢
        • 2018-04-30
        • 2014-04-19
        • 1970-01-01
        • 1970-01-01
        • 2014-06-11
        • 1970-01-01
        • 1970-01-01
        • 2013-07-26
        • 2016-12-23
        相关资源
        最近更新 更多