【问题标题】:c++ class objects in concurrent threadsc++ 并发线程中的类对象
【发布时间】:2019-02-12 15:58:14
【问题描述】:

我试图启动一个有正在运行的线程的类。当我创建对象时,它会创建一组错误的对象。我真的不明白发生了什么。这个想法是用它们的线程创建 10 个对象并从线程 f2 获取消息。我需要一个按规格工作线程的队列。我使用互斥锁是因为我认为 c++ std:queue 不是线程安全的。问题是当 main 运行时它显示大多数对象都是相同的。我贴出代码:

class classOne {
public:
    classOne(int n, std::shared_ptr<std::mutex> prompr_mtx_ptr);
    ~classOne();

    int m_id;
    bool m_stop;
    std::shared_ptr<std::mutex> m_mtx;
    std::shared_ptr<std::thread> m_classOne_thread;
    std::shared_ptr<std::mutex> m_prompr_mtx_ptr;
    std::shared_ptr<std::queue<std::string>> m_queue;

    void classOne_thread_body();
};

classOne::~classOne() {

}

classOne::classOne(int n, std::shared_ptr<std::mutex> prompr_mtx_ptr):
        m_id(n),
        m_prompr_mtx_ptr(prompr_mtx_ptr)
{
    m_queue = std::make_shared<std::queue<std::string>>();
    m_mtx = std::make_shared<std::mutex>();
    m_stop = false;
    m_classOne_thread = std::make_shared<std::thread>(&classOne::classOne_thread_body, this);
}

void classOne::classOne_thread_body() {
    int watchDog = 0, debugDog = 0;
    m_prompr_mtx_ptr->lock();
    std::cout<<"[ Thread "<<std::to_string(m_id)<<" ] Started"<<std::endl;
    m_prompr_mtx_ptr->unlock();
    while (!m_stop) {
        m_mtx->lock();
        if (!m_queue->empty()) {
            m_prompr_mtx_ptr->lock();
            std::cout<<"[ Thread "<<std::to_string(m_id)<<" ] Received message: "<<m_queue->front()<<std::endl;
            m_prompr_mtx_ptr->unlock();
            m_queue->pop();
            watchDog = 0;
        }
        m_mtx->unlock();

        ++watchDog;
        if (watchDog>= 1000) { m_stop = true; }
        if (watchDog - debugDog > 100) {
            m_prompr_mtx_ptr->lock();
            std::cout<<"WatchDog: "<<watchDog<<std::endl;
            m_prompr_mtx_ptr->unlock();
            debugDog = watchDog;
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void f2(int n, std::vector<classOne> elements)
{
    for (int i = 0; i < 100; ++i) {
        for(std::vector<classOne>::iterator it=elements.begin(); it!=elements.end(); ++it) {
            it->m_mtx->lock();
            it->m_queue->push(std::string("Message "+std::to_string(i)+" to thread "+std::to_string(it->m_id)+" from "+std::to_string(n)));
            it->m_mtx->unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
    }
}

int main()
{
    std::thread t1; // t1 is not a thread
    int generators = 10;
    std::vector<classOne> genClassOnes;
    std::shared_ptr<std::mutex> prompr_mtx_ptr = std::make_shared<std::mutex>();

    for (int i = 0; i < generators; i++) {
        genClassOnes.push_back(classOne(i, prompr_mtx_ptr));
    }

    std::thread t2(f2, 1, genClassOnes); // pass by value
    t2.join();

    for (std::vector<classOne>::iterator iter=genClassOnes.begin(); iter!=genClassOnes.end(); ++iter) {
        iter->m_classOne_thread->join();
    }
}

这是错误的输出:

[ Thread 3 ] Started
[ Thread 8 ] Started
[ Thread 9 ] Started
[ Thread 9 ] Started
[ Thread 9 ] Started
[ Thread 9 ] Started
[ Thread 9 ] Started
[ Thread 9 ] Started
[ Thread 9 ] Started
[ Thread 9 ] Started
[ Thread 9 ] Received message: Message 0 to thread 9 from 1

【问题讨论】:

  • 我会非常谨慎地将 classOne 对象存储在这样的向量中。如果向量在五个元素后调整大小怎么办?是的,所有对象都将被复制到新向量中,但正在运行的线程将仍然使用旧地址作为它们的this 指针。至少使用new 堆分配classOne 对象并使用vector&lt;classOne*&gt;
  • 一个小注释:生成一个线程然后立即joining 它并不比直接执行代码好。
  • 最后,一旦你得到这个工作,阅读条件变量。您的消费者线程应该阻塞条件变量,并且只有在消费者将消息传递到他们的队列时才会唤醒。

标签: c++ c++11 concurrency


【解决方案1】:

std::vector&lt;classOne&gt; 要求 classOne 在您调用 push_backresize 时是可复制的。而std::list&lt;classOne&gt; 不需要classOne 是可复制的,因此您可以删除所有这些shared_ptrs。

使用条件变量让线程知道队列中有可用的新元素。

工作示例:

#include <list>
#include <queue>
#include <mutex>
#include <string>
#include <thread>
#include <iostream>
#include <condition_variable>

class classOne {
    int m_id;
    std::mutex m_mtx;
    std::condition_variable m_cnd;
    std::mutex* const m_prompr_mtx_ptr;
    std::queue<std::string> m_queue;
    std::thread m_classOne_thread;

    void classOne_thread_body();

    template<class F>
    void log(F&& f) {
        std::lock_guard<std::mutex> lock(*m_prompr_mtx_ptr);
        f(std::cout);
    }

public:
    classOne(int n, std::mutex& prompr_mtx_ptr);

    ~classOne() {
        this->stop();
        m_classOne_thread.join();
    }

    void post(std::string msg) {
        {
            std::lock_guard<std::mutex> lock(m_mtx);
            m_queue.push(move(msg));
        }
        m_cnd.notify_one();
    }

    void stop() { this->post({}); }

    int get_id() const { return m_id; }
};

classOne::classOne(int n, std::mutex& prompr_mtx_ptr):
    m_id(n),
    m_prompr_mtx_ptr(&prompr_mtx_ptr),
    m_classOne_thread(&classOne::classOne_thread_body, this)
{}

void classOne::classOne_thread_body() {
    log([&](std::ostream& s) { s <<"[ Thread "<<std::to_string(m_id)<<" ] Started"<<std::endl; });
    for(std::string msg;;) {
        {
            std::unique_lock<std::mutex> lock(m_mtx);
            while(m_queue.empty())
                m_cnd.wait(lock);
            msg = m_queue.front();
            m_queue.pop();
        }
        if(msg.empty())
            break;
        log([&](std::ostream& s) { s <<"[ Thread "<<std::to_string(m_id)<<" ] Received message: "<< msg << std::endl; });
    }
}

void f2(int n, std::list<classOne>& elements) {
    for (int i = 0; i < 100; ++i) {
        for(auto& elem : elements)
            elem.post("Message "+std::to_string(i)+" to thread "+std::to_string(elem.get_id())+" from "+std::to_string(n));
    }
}

int main() {
    int generators = 10;
    std::mutex prompr_mtx_ptr;

    std::list<classOne> genClassOnes;
    for(int i = 0; i < generators; i++)
        genClassOnes.emplace_back(i, prompr_mtx_ptr);

    std::thread t2(f2, 1, std::ref(genClassOnes)); // pass by reference
    t2.join();
}

【讨论】:

  • 很棒的代码,非常感谢。有很多事情我需要更好地理解,比如您在 classOne 中实现的 log 方法和这个 lock_guard,但我将使用 c++ 参考来研究这些内容。一个特别的疑问,为什么在调用 stop 方法时使用空字符串调用方法 post ?确保线程退出while循环?非常感谢。
  • @JonSnow 你说得对,空字符串作为工作线程跳出队列处理循环并终止的信号。
猜你喜欢
  • 2012-08-07
  • 2017-12-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-10-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多