【问题标题】:Do I have a deadlock or race condition in my producer consumer code?我的生产者消费者代码中是否存在死锁或竞争条件?
【发布时间】:2020-06-12 17:11:18
【问题描述】:

我正在尝试在 C++ 中实现生产者 消费者模式。 当我读到这种模式时,他们似乎总是提到必须避免的潜在死锁。 但是,我在下面没有使用任何互斥体就实现了这一点。 我的代码有什么问题?

#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>

class CircularBuffer
{
public:
    CircularBuffer();
    int*          getWritePos();
    void      finishedWriting();
    int*           getReadPos();
    void      finishedReading();
private:
    void waitForReaderToCatchUp();
    void waitForWriterToCatchUp();

    const int size = 5;
    std::vector<int> data;
    // Changed from int since these variables are shared between the two threads and assignment is not necessarily atomic: 
    std::atomic<int> writePos = 0;
    std::atomic<int> readPos = 0;
};

CircularBuffer::CircularBuffer() {
    data.resize(size);
}

void
CircularBuffer::waitForReaderToCatchUp() {
    int unread = writePos - readPos;
    while (unread >= size) {
        std::this_thread::sleep_for(std::chrono::nanoseconds(10));
        unread = writePos - readPos;
    }
}

int*
CircularBuffer::getWritePos() {
    waitForReaderToCatchUp();
    int pos = writePos % size;
    return &data[pos];
}

void
CircularBuffer::finishedWriting() {
    writePos++;
}

void
CircularBuffer::waitForWriterToCatchUp() {
    int unread = writePos - readPos;
    while (unread < 1) {
        std::this_thread::sleep_for(std::chrono::nanoseconds(10));
        unread = writePos - readPos;
    }
}

int*
CircularBuffer::getReadPos() {
    waitForWriterToCatchUp();
    int pos = readPos % size;
    return &data[pos];
}

void
CircularBuffer::finishedReading() {
    readPos++;
}

const int produceMinTime = 100;

void produce(CircularBuffer *cb) {
    for (int i = 0; i < 15; i++) {
        int r = rand() % 1000;
        std::this_thread::sleep_for(std::chrono::milliseconds(produceMinTime + r));
        int *p = cb->getWritePos();
        memcpy(p, &i, 4);
        cb->finishedWriting();
    }
}

void consume(CircularBuffer *cb) {
    for (int i = 0; i < 15; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        int *p = cb->getReadPos();
        int j = *p;
        std::cout << "Value: " << j << std::endl;
        cb->finishedReading();
    }
}

int main()
{
    CircularBuffer cb;
    std::thread t1(produce, &cb);
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    std::thread t2(consume, &cb);

    t1.join();
    t2.join();
    int k;
    std::cin >> k;
}

【问题讨论】:

  • 我不明白你的问题是什么?如果你没有像互斥锁这样的东西,那么你就不会出现死锁。
  • 不确定此代码中的死锁 - 你必须询问提出它的人 - 但任何时候你看到 this_thread::sleep_for() 问题都应该问
  • 您不会遇到死锁,但您会遇到更糟糕的情况:由于数据竞争导致的未定义行为。您需要同步原语来阻止它们。
  • 代码不起作用,问题是它没有经过足够好的测试。数据竞争错误每月仅发生一次,一个月或一个月。另一个值得注意的缺陷是,在现代机器上,一毫秒是永恒的。不用自己写,google“c++线程安全循环缓冲区”找代码。
  • @HansPassant:谢谢。我把它改成了 10 纳秒。

标签: c++ multithreading


【解决方案1】:

std::vector&lt;int&gt; 不是线程安全的数据结构。因此,如果您同时从两个线程访问它,这将被视为未定义的行为。您可能会崩溃、遇到其他问题,或者它可能看似有效(但仍然是错误的)。

向量中的整数和代表你位置的整数也不是线程安全的——读/写不一定是原子的(有无锁的方法可以做到这一点)。

因此,您可以完全实现这种无锁的东西,但不能这样。这里有一些信息:https://www.infoq.com/news/2014/10/cpp-lock-free-programming/

一般来说,你想看看 std::atomic 中的原语:https://en.cppreference.com/w/cpp/atomic/atomic

另见:Ring buffer with atomic indexes

【讨论】:

  • 感谢您的帮助!如果我将 writePos 和 readPos 更改为 atomic 会解决您第二段中的问题吗?
  • 关于 std::vector 不是线程安全的。据我了解,从不同的线程阅读可以吗?就我而言,我既读又写。但是,我从不同时写入和读取相同的元素。生产者总是比消费者领先至少 1 个元素。向量也永远不会调整大小。这也不行吗? stackoverflow.com/questions/32694114/…
  • en.cppreference.com/w/cpp/container#Thread_safety: "所有的 const 成员函数都可以被同一个容器上的不同线程同时调用。", "operator[], 出于线程安全的目的, 表现为 const", "不同的元素在同一个容器中可以被不同的线程同时修改”。
  • 太棒了。那么你仍然需要处理pos vars。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-04-01
  • 2018-06-03
  • 1970-01-01
  • 2013-03-06
  • 1970-01-01
  • 2011-04-25
  • 1970-01-01
相关资源
最近更新 更多