【问题标题】:Blocking concurrent queue with lock-free algorithms使用无锁算法阻塞并发队列
【发布时间】:2016-12-20 11:16:16
【问题描述】:

我正在构建一个阻塞的并发队列(即,如果队列中没有任何内容,消费者就会休眠,直到生产者唤醒)。我的用例受到限制,因为出队操作将始终阻塞在空队列上;没有tryDequeue

我当前的实现只是使用std::mutexstd::condition_variable。我想知道用无锁算法改进数据结构是否有意义。在队列为空的情况下,这可能没有意义,因为消费者无论如何都必须阻止。但是在队列非空的情况下,我能做些什么吗?

【问题讨论】:

  • 为什么要结束投票?
  • 您不能以无锁方式明智地“阻止”。 (你可以忙循环,但这没有用。)
  • @KerrekSB 我在想也许在出队时,我可以先无锁地尝试弹出前面,只有当这不起作用时才进入阻塞阶段? (当然,入队必须相应地改变。)
  • @Kerrek,busy-loops (AKA spinlocks) 通常用于这种目的 - 即当您期望其他线程“锁定”共享对象一段时间时间比上下文切换到内核执行传统阻塞所产生的成本要短。
  • 大多数现代锁在切换上下文之前都会旋转;因此,如果不进行更重大的设计更改,您可能一无所获。

标签: c++ multithreading


【解决方案1】:

我认为这个问题是在问您是否应该无锁。

在现代系统上,无锁只有在每个项目的工作时间大大低于 1 毫秒或一些有趣的大规模锁争用场景(未讨论,Web 服务器就是一个例子)时才有意义。

您可以通过查看条件变量响应所需的时间来了解通知条件变量所需的时间:

#include <thread>
#include <array>
#include <mutex>
#include <iostream>
#include <condition_variable>

std::chrono::microseconds timestamp()
{
    const auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch());
    return microseconds;
}
struct measurement
{
    std::chrono::microseconds sent, acknowledged, received;
    measurement() :sent{ 0 }, acknowledged{ 0 }, received{ 0 } {}
};

int main(int argc, char** argv)
{
    std::array<measurement, 15> notifications;
    std::mutex notifications_m;
    std::condition_variable notification_cv;
    auto notifying_thread = std::thread([&] {
        for (auto i = 0; i < notifications.size(); ++i)
        {
            {
                std::unique_lock<std::mutex> lk(notifications_m);
                notifications[i].sent = timestamp();
            }
            notification_cv.notify_one();
            //Now we wait for the notification to be acknowledged
            std::unique_lock<std::mutex> lk(notifications_m);
            const auto check_that_acknowledged = [&] {return notifications[i].acknowledged != std::chrono::microseconds(0); };
            notification_cv.wait(lk, check_that_acknowledged);
            notifications[i].received = timestamp();

        }
    });
    for (auto i = 0; i < notifications.size(); ++i)
    {
        {
            std::unique_lock<std::mutex> lk(notifications_m);
            const auto check_that_sent = [&] {return notifications[i].sent != std::chrono::microseconds(0); };
            notification_cv.wait(lk, check_that_sent);
            notifications[i].acknowledged = timestamp();
        }
        notification_cv.notify_one();
    }
    notifying_thread.join();
    //
    for (const auto& notification : notifications)
    {
        const auto difference_us = notification.received - notification.sent;
        std::cout << difference_us.count() << " microseconds" << std::endl;
    }
    return 0;
}

即使在一些延迟时间最长的系统上,往返也需要大约 0.1 毫秒: https://coliru.stacked-crooked.com/a/9598c83dd05e09b5

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-16
    • 2010-11-15
    • 2013-07-25
    相关资源
    最近更新 更多