【问题标题】:c++11 lock-free queue with 2 threadc++11 2线程无锁队列
【发布时间】:2023-03-17 08:13:01
【问题描述】:

除了主线程,我还有一个线程接收数据并将它们写入文件。

std::queue<std::vector<int>> dataQueue;
std::mutex mutex;

void setData(const std::vector<int>& data) {
    std::lock_guard<std::mutex> lock(mutex);
    dataQueue.push(data);
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store) {
        std::lock_guard<std::mutex> lock(mutex);

        while (!dataQueue.empty()) {
            std::vector<int>& data= dataQueue.front();

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}

setData 被主线程使用,write 实际上是写线程。我使用std::lock_quard 来避免内存冲突,但是当锁定写入线程时,它会减慢主线程,因为它必须等待队列被解锁。但我想我可以避免这种情况,因为线程永远不会同时作用于队列的同一个元素。

所以我想做到无锁,但我真的不明白我应该如何实现它。我的意思是,我怎么能在不锁定任何东西的情况下做到这一点?此外,如果写入线程比主线程快,则队列可能大部分时间都是空的,因此它应该以某种方式等待新数据,而不是无限循环以检查非空队列。

编辑:我将简单的std::lock_guard 更改为std::cond_variable,以便它可以在队列为空时等待。但是主线程仍然可以被阻塞,当cvQeue.wait(.)被解析时,它会重新获得锁。另外,如果主线程执行cvQueue.notify_one(),而写线程没有等待呢?

std::queue<std::vector<int>> dataQueue;
std::mutex mutex;
std::condition_variable cvQueue;

void setData(const std::vector<int>& data) {
    std::unique_lock<std::mutex> lock(mutex);
    dataQueue.push(data);
    cvQueue.notify_one();
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store) {
        std::lock_guard<std::mutex> lock(mutex);

        while (!dataQueue.empty()) {
            std::unique_lock<std::mutex> lock(mutex);
            cvQueue.wait(lock);

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}

【问题讨论】:

  • 您可以简单地在write 呼叫周围解锁,然后重新锁定。写入时无需持有锁,这可能是循环中最耗时的部分。

标签: multithreading c++11 locking lock-free


【解决方案1】:

如果您只有两个线程,则可以使用无锁单生产者单消费者 (SPSC) 队列。
可以在此处找到有界版本:https://github.com/rigtor/SPSCQueue
Dmitry Vyukov 在这里展示了一个无限版本:http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue(但您应该注意,这段代码应该适合使用原子。)

关于阻塞弹出操作 - 这是无锁数据结构不提供的东西,因为这样的操作显然不是无锁的。但是,以这样一种方式调整链接实现应该相对简单,即如果队列在推送之前为空,则推送操作会通知条件变量。

【讨论】:

    【解决方案2】:

    我想我有一些东西可以满足我的需要。我做了一个使用std::atomicLockFreeQueue。因此,我可以原子地管理队列头/尾的状态。

    template<typename T>
    class LockFreeQueue {
    public:
        void push(const T& newElement) {
            fifo.push(newElement);
            tail.fetch_add(1);
            cvQueue.notify_one();
        }
    
        void pop() {
            size_t oldTail = tail.load();
            size_t oldHead = head.load();
    
            if (oldTail == oldHead) {
                return;
            }
    
            fifo.pop();
            head.store(++oldHead);
        }
    
        bool isEmpty() {
            return head.load() == tail.load();
        }
    
        T& getFront() {
            return fifo.front();
        }
    
        void waitForNewElements() {
            if (tail.load() == head.load()) {
                std::mutex m;
                std::unique_lock<std::mutex> lock(m);
                cvQueue.wait_for(lock, std::chrono::milliseconds(TIMEOUT_VALUE));
            }
        }
    
    private:
        std::queue<T> fifo;
        std::atomic<size_t> head = { 0 };
        std::atomic<size_t> tail = { 0 };
        std::condition_variable cvQueue;
    };
    
    LockFreeQueue<std::vector<int>> dataQueue;
    std::atomic<bool> store(true);
    
    void setData(const std::vector<int>& data) {
        dataQueue.push(data);
        // do other things
    }
    
    void write(const std::string& fileName) {
        std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));
    
        while (store.load()) {
    
            dataQueue.waitForNewElements();
    
            while (!dataQueue.isEmpty()) {
                std::vector<int>& data= dataQueue.getFront();
    
                ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());
    
                dataQueue.pop();
                }
            }
        }
    }
    

    我在waitForNewElements 中仍然有一个锁,但它并没有锁定整个过程,因为它正在等待要做的事情。但最大的改进是生产者可以在消费者弹出时推送。只有当LockFreQueue::tailLockFreeQueue::head 相同时才被禁止。表示队列为空,进入等待状态。

    我不太满意的是cvQueue.wait_for(lock, TIMEOUT_VALUE)。我想做一个简单的cvQueue.wait(lock),但问题是当涉及到结束线程时,我在主线程中做store.store(false)。因此,如果写入线程正在等待,它将永远不会在没有超时的情况下结束。所以,我设置了一个足够大的超时时间,这样大部分时间condition_variable 被锁解决,而当线程结束时,它被超时解决。

    如果您觉得某些地方一定有问题或必须改进,请随时发表评论。

    【讨论】:

    • std::queue 不是线程安全的。您没有使用任何锁定,但是当两个线程同时运行 fifo.pushfifo.pop 时,您也没有做任何事情来确保安全!也许这恰好适用于std::list 底层容器,尤其是使用您的原子计数器来阻止您在空的 fifo 上运行 pop 我认为这是目的?但总的来说,将std::queue 用作 SPSC 队列并不安全。
    • 我不明白。推送后,推送计数器增加。所以,新元素只有在完成后才会“活跃起来”,不是吗?
    • 是的,并且没有任何加载/存储在 std::atomic 对象上,因此它的数据竞争 UB 供不同线程访问它们。在 x86 上,编译器生成的任何 asm 都将受益于 x86 的强内存模型(每个加载/存储都是获取/释放)。弱排序的 ISA 可能只需要在读取端使用mo_consume,因为数据依赖于它们加载的指针,因此它们免费执行此操作,但写入端不会使用发布存储来确保最后的指针修改是'在节点中的所有其他数据都可见之前不可见。
    • 编写一个安全的 SPSC 链表队列肯定需要 一些 atomic&lt;&gt; 操作,绝对是为了使其安全可移植,而不仅仅是在 x86 上偶然工作.除了std::queue 本身之外的原子计数器之外,您的队列不使用任何东西,您将其与seq_cst 订单一起使用。我很确定这些还不够。
    • en.cppreference.com/w/cpp/container/list 并没有说它是线程安全的,所以你必须假设它不是。它确实说其他迭代器不会因您使用的操作而无效,但这与线程安全不同。 list&lt;&gt; 对象本身有一些 push/pop 将修改的指针。 IDK 在某些实现上可能碰巧是安全的。
    猜你喜欢
    • 2013-02-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-18
    • 2013-07-31
    • 2014-10-31
    • 2016-06-11
    • 2013-04-22
    相关资源
    最近更新 更多