【问题标题】:C++ how one thread can communicate properly that its task is finished?C ++一个线程如何正确通信其任务已完成?
【发布时间】:2020-06-30 20:02:11
【问题描述】:

我是 C++ 和线程的新手。所以我很困惑。

我正在尝试编写一个包装通用队列并提供两种方法的类:push 和 pop。 (线程安全)

push 方法将使用 lock_guard 并将作为参数接收的 elem 推送到队列中。 pop 方法会一直等到至少有一个元素可以从队列中读取而不消耗 CPU(我使用了 condition_variable 和它的“wait”方法)。

我认为我对类本身的实现没有任何问题。 但是我在线程中使用这个类时遇到了问题。

我使用两个线程(“生产者”和“消费者”):

-“producer”读取一个 .txt 文件(每行包含一个字符串)并为每个字符串调用 push 方法。

-"consumer" 改为调用 pop 方法并将结果存储到字符串向量中。

我不知道如何管理调用对象 bf 的 pop 方法的消费者循环的终止。 我采用的临时解决方案是将字符串“EOF”推入队列,以警告消费者线程生产者已完成任务但我不喜欢它。

我想在 Buffer 类(受互斥锁保护)中定义一个布尔标志(初始化为 false)以及将其设置为 true 的方法(我们可以称之为“setFlagTrue”)。

生产者将在循环结束时调用 setFlagTrue 方法。 相反,消费者会以某种方式检查标志以停止迭代。

我对其正确性感到困惑,因为我想确保这些情况不会 100% 发生:

  1. 生产者将每个字符串推入队列并将标志设置为真。消费者是如此之快,以至于在标志被交换为真之前从队列中弹出每个字符串,因此它再次开始等待元素被弹出而不被通知。
  2. 消费者读取标志值 true 但尚未完成弹出字符串。 (我想我可以解决这个检查标志以及队列中是否还有字符串的问题。对吗?)
  3. 有关“生产者”和“消费者”线程的相对速度的任何其他可能问题。

这是我写的代码:

#include <iostream>
#include <mutex>
#include <fstream>
#include <condition_variable>
#include <queue>
#include <string>
#include <thread>
#include <vector>

template <typename T>
class Buffer{

private:
    std::mutex m;
    std::condition_variable cv;
    std::queue<T> buffer;

public:
    void push(T elem){
        std::lock_guard lg(m);
        buffer.push(elem);
        cv.notify_one();
    }

    T pop(){
        T elemToReturn;
        std::unique_lock ul(m);
        cv.wait(ul, [this](){return !this->buffer.empty();});
        elemToReturn = buffer.front();
        buffer.pop();
        ul.unlock();
        return elemToReturn;
    }
};

int main() {
    Buffer<std::string> bf;
    std::string filename("../file.txt");
    std::vector<std::string> results;

    std::thread producer ([&bf, filename](){
        std::ifstream inputFile(filename);
        std::string str;

        while(getline(inputFile, str)){
            bf.push(str);
        }

            bf.push("EOF");
    });

    std::thread consumer ([&bf, &results](){
        std::string elem;
        for(elem=bf.pop(); elem!="EOF"; elem=bf.pop()){
            results.push_back(elem);
        }
    });

    producer.join();
    consumer.join();

    for(auto &elem: results){
       std::cout<<elem<<std::endl;
    }

    return 0;
}

如果您能向我解释如何避免这些问题并发布您的解决方案代码,我将永远感激您。 请给我一个线程安全的准确定义。

感谢您的宝贵时间。

【问题讨论】:

  • 只是一个建议:你怎么知道你在文件的末尾?可以为消费者端实现类似的接口。此外,您如何表示您已完成对文件的写入?生产者端可以实现类似的接口。也就是说,写一个所谓的信号值(“EOF”字符串)是一种常见的方法。
  • 如果您需要一种关闭缓冲区的方法,那么该类应该实现一种关闭缓冲区的方法。这可能意味着一个“signalEOF”函数,它设置一个标志并发出条件变量的信号,以及pop 函数指示缓冲区已关闭的方式。
  • 可以推荐一本叫《concurrency in action》的书,非常适合学习线程安全。
  • @DavidSchwartz 所以在实践中我应该定义一个设置标志并调用 cv.notify_one() 的方法?就这些?
  • @rustyx 谢谢。我肯定会读它,但我会努力在 10 天内维持考试。我觉得我没有足够的时间阅读所有内容。

标签: c++ multithreading thread-safety thread-synchronization


【解决方案1】:

我会建议 2 种可能的简单解决方案:

  1. std::optional&lt;std::string&gt; 用作T 并将空可选作为eof 的标志。使用特殊字符串是容易出错的解决方案。此方法需要对您的代码进行最小的更改。

  2. Buffer 中添加一个布尔标志,并在确保缓冲区为空后在pop 中检查它。这需要更多的代码更改 - 您的谓词需要扩展并检查标志。您还需要决定返回给调用者的内容以通知结束条件(或通过异常)。当然你需要在 mutex locked 下设置 flag 并调用 notify。

所以我会说方法 1 会更简单易懂,即更具可读性,如果在代码中添加移动语义,它的效率也不会降低:

void push(T elem){
    std::lock_guard lg(m);
    if( buffer.empty() )
        cv.notify_one();
    buffer.push( std::move(elem) );
}

T pop(){
    std::unique_lock ul(m);
    cv.wait(ul, [this](){return !this->buffer.empty();});
    auto elemToReturn = std::move( buffer.front() );
    buffer.pop();
    return elemToReturn;
}

【讨论】:

  • 非常感谢您的回答。
  • 我使用了第一个建议。现在有一个问题:我需要使用 N 个生产者来实现相同的功能。我认为第一种方法很难使用。所以我更喜欢使用每次生产者线程完成工作时都会增加的标志。我现在的问题是:这个 int 标志是否应该被包装到 std::atomic 中,为什么?或者使用受互斥体保护的 int 变量就足够了?
猜你喜欢
  • 2012-03-06
  • 2010-10-02
  • 1970-01-01
  • 2017-09-12
  • 2013-12-11
  • 2023-03-29
  • 2011-07-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多