【问题标题】:Spurious underflow in C++ lock-free queue implementationC++ 无锁队列实现中的虚假下溢
【发布时间】:2018-04-30 06:04:24
【问题描述】:

我正在尝试实现一个使用线性循环缓冲区来存储数据的无锁队列。与通用无锁队列相比,我有以下放松条件:

  • 我知道最坏情况下将存储在队列中的元素数量。队列是对一组固定元素进行操作的系统的一部分。代码将永远不会尝试在队列中存储更多元素,因为此固定集合中有元素。
  • 没有多生产者/多消费者。队列将用于多生产者/单消费者单生产者/多消费者设置。

从概念上讲,队列实现如下

  • 标准二次幂环形缓冲区。 底层数据结构是使用power-of-two trick 的标准环形缓冲区。读取和写入索引只会递增。当使用简单的位掩码对数组进行索引时,它们被限制为底层数组的大小。读指针在pop() 中原子递增,写指针在push() 中原子递增。
  • 大小变量控制对pop() 的访问。一个额外的“大小”变量跟踪队列中元素的数量。这消除了对读取和写入索引执行算术的需要。 size 变量在整个写入操作发生后自动递增,即数据已写入后备存储并且写入光标已递增。我正在使用compare-and-swap (CAS) 操作以原子方式减小pop() 中的大小,并且仅在大小不为零时才继续。这样pop()应该保证返回有效数据。

我的队列实现如下。请注意,每当pop() 尝试读取之前由push() 写入的内存时,调试代码就会停止执行。这绝不应该发生,因为 - 至少在概念上 - pop() 可能仅在队列中有元素时才会继续(不应该有下溢)。

#include <atomic>
#include <cstdint>
#include <csignal> // XXX for debugging

template <typename T>
class Queue {
private:
    uint32_t m_data_size;   // Number of elements allocated
    std::atomic<T> *m_data; // Queue data, size is power of two
    uint32_t m_mask;        // Bitwise AND mask for m_rd_ptr and m_wr_ptr
    std::atomic<uint32_t> m_rd_ptr; // Circular buffer read pointer
    std::atomic<uint32_t> m_wr_ptr; // Circular buffer write pointer
    std::atomic<uint32_t> m_size;   // Number of elements in the queue

    static uint32_t upper_power_of_two(uint32_t v) {
        v--; // https://graphics.stanford.edu/~seander/bithacks.html
        v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
        v++;
        return v;
    }

public:
    struct Optional { // Minimal replacement for std::optional
        bool good;
        T value;
        Optional() : good(false) {}
        Optional(T value) : good(true), value(std::move(value)) {}
        explicit operator bool() const { return good; }
    };

    Queue(uint32_t max_size)
        : // XXX Allocate 1 MiB of additional memory for debugging purposes
          m_data_size(upper_power_of_two(1024 * 1024 + max_size)),
          m_data(new std::atomic<T>[m_data_size]),
          m_mask(m_data_size - 1),
          m_rd_ptr(0),
          m_wr_ptr(0),
          m_size(0) {
        // XXX Debug code begin
        // Fill the memory with a marker so we can detect invalid reads
        for (uint32_t i = 0; i < m_data_size; i++) {
            m_data[i] = 0xDEADBEAF;
        }
        // XXX Debug code end
    }

    ~Queue() { delete[] m_data; }

    Optional pop() {
        // Atomically decrement the size variable
        uint32_t size = m_size.load();
        while (size != 0 && !m_size.compare_exchange_weak(size, size - 1)) {
        }

        // The queue is empty, abort
        if (size <= 0) {
            return Optional();
        }

        // Read the actual element, atomically increase the read pointer
        T res = m_data[(m_rd_ptr++) & m_mask].load();

        // XXX Debug code begin
        if (res == T(0xDEADBEAF)) {
            std::raise(SIGTRAP);
        }
        // XXX Debug code end
        return res;
    }

    void push(T t) {
        m_data[(m_wr_ptr++) & m_mask].store(t);
        m_size++;
    }

    bool empty() const { return m_size == 0; }
};

但是,下溢确实会发生,并且很容易在多线程压力测试中触发。在这个特定的测试中,我维护了两个队列q1q2。在主线程中,我将固定数量的元素输入q1。两个工作线程从q1 读取并在紧密循环中推送到q2。主线程从q2读取数据并反馈给q1

如果只有一个工作线程(单一生产者/单一消费者),或者只要所有工作线程与主线程在同一个 CPU 上,这种方法就可以正常工作。但是,一旦有两个工作线程被显式调度到与主线程不同的 CPU 上,它就会失败。

下面的代码实现了这个测试

#include <pthread.h>
#include <thread>
#include <vector>

static void queue_stress_test_main(std::atomic<uint32_t> &done_count,
                                   Queue<int> &queue_rd, Queue<int> &queue_wr) {
    for (size_t i = 0; i < (1UL << 24); i++) {
        auto res = queue_rd.pop();
        if (res) {
            queue_wr.push(res.value);
        }
    }
    done_count++;
}

static void set_thread_affinity(pthread_t thread, int cpu) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(cpu, &cpuset);
    if (pthread_setaffinity_np(thread, sizeof(cpu_set_t),
                               &cpuset) != 0) {
        throw "Error while calling pthread_setaffinity_np";
    }
}

int main() {
    static constexpr uint32_t n_threads{2U}; // Number of worker threads
    //static constexpr uint32_t n_threads{1U}; // < Works fine
    static constexpr uint32_t max_size{16U}; // Elements in the queue
    std::atomic<uint32_t> done_count{0};     // Number of finished threads
    Queue<int> queue1(max_size), queue2(max_size);

    // Launch n_threads threads, make sure the main thread and the two worker
    // threads are on different CPUs.
    std::vector<std::thread> threads;
    for (uint32_t i = 0; i < n_threads; i++) {
        threads.emplace_back(queue_stress_test_main, std::ref(done_count),
                             std::ref(queue1), std::ref(queue2));
        set_thread_affinity(threads.back().native_handle(), 0);
    }
    set_thread_affinity(pthread_self(), 1);
    //set_thread_affinity(pthread_self(), 0); // < Works fine

    // Pump data from queue2 into queue1
    uint32_t elems_written = 0;
    while (done_count < n_threads || !queue2.empty()) {
        // Initially fill queue1 with all values from 0..max_size-1
        if (elems_written < max_size) {
            queue1.push(elems_written++);
        }

        // Read elements from queue2 and put them into queue1
        auto res = queue2.pop();
        if (res) {
            queue1.push(res.value);
        }
    }

    // Wait for all threads to finish
    for (uint32_t i = 0; i < n_threads; i++) {
        threads[i].join();
    }
}

大多数情况下,该程序会触发队列代码中的陷阱,这意味着pop() 会尝试读取push() 从未接触过的内存——尽管pop() 应该如果push() 的调用频率至少与pop() 一样,则成功。

您可以使用 GCC/clang 在 Linux 上编译和运行上述程序

c++ -std=c++11 queue.cpp -o queue -lpthread && ./queue

或者直接拼接上面两个代码块或者下载完整的程序here

请注意,对于无锁数据结构,我完全是个新手。我非常清楚有很多经过实战考验的 C++ 无锁队列实现。但是,我根本无法弄清楚为什么上面的代码不能按预期工作。

【问题讨论】:

  • 无法重现您的问题:repl.it/repls/FlashyMediocreDebuggers
  • @Kapil:感谢您尝试这个!尽管(正如下面的 cmets 所指出的)该算法存在根本性的缺陷,但这只会在相当快的 SMP 机器上以高概率失败。最好在您可以完全控制的执行环境中尝试这种算法而无需任何虚拟化。另外,在线程中放置 print 语句是个坏主意,因为循环必须尽可能紧凑,以增加触发并发问题的机会。

标签: c++ lock-free


【解决方案1】:

您有两个错误,其中一个可能导致您观察到的失败。

让我们看看你的推送代码,除了我们将只允许每个语句进行一次操作:

void push(T t)
{
    auto const claimed_index = m_wr_ptr++;               /* 1 */
    auto const claimed_offset = claimed_index & m_mask; /* 2 */
    auto& claimed_data = m_data[claimed_offset];         /* 3 */
    claimed_data.store(t);                               /* 4 */
    m_size++;                                            /* 5 */
}

现在,对于有两个生产者的队列,操作 1 和 4 之间存在竞争条件的脆弱性窗口:

之前:

m_rd_ptr == 1
m_wr_ptr == 1
m_size == 0

制作人A:

/* 1 */ claimed_index = 1; m_wr_ptr = 2;
/* 2 */ claimed_offset = 1;
  • 调度程序让生产者 A 在这里休眠

制作人 B:

/* 1 */ claimed_index = 2; m_wr_ptr = 3;
/* 2 */ claimed_offset = 2;
/* 3 */ claimed_data = m_data[2];
/* 4 */ claimed_data.store(t);
/* 5 */ m_size = 1;

之后:

m_size == 1
m_rd_ptr == 1
m_wr_ptr == 3
m_data[1] == 0xDEADBEAF
m_data[2] == value_produced_by_B

消费者现在运行,看到m_size &gt; 0,并从m_data[1]读取,同时将m_rd_ptr从1增加到2。但是m_data[1]还没有被生产者A写入,生产者B写信给@987654330 @。

第二个错误是pop() 中的补充情况,当消费者线程在m_rd_ptr++ 操作和.load() 调用之间中断时。这可能会导致读取值乱序,甚至可能乱序到队列已完全环绕并覆盖原始值。

仅仅因为单个源语句中的两个操作是原子的,并不会使整个语句成为原子的。

【讨论】:

  • 对于这种全局操作顺序,您不需要线程休眠。我假设 OP 正在使用 SMP 系统,其中同时运行的两个线程的操作可以以这种方式交错。 (顺便说一句,我认为对于reserved_index,您打算再次编写claimed_index。)此外,操作/*2*//*3*/ 根本不是全局可观察的(它们不读取或写入任何共享变量),它是只是一个线程私下做的事情。无论如何,是的,错误在于m_size++ 没有为您提供有效读取的索引,因为推送不必按声明顺序完成。
  • @PeterCordes:确实没有必要抢占一个线程,但这就是在 OP 的测试运行中发生的情况,当两个生成的线程对同一个核心具有亲和力时。感谢您发现我在起草此答案时使用的变量名的剩余使用。没有全局可见性的指令仍然很重要,因为它们增加了漏洞的窗口——当您谈论多线程和竞争时,计时一个可见的副作用。
  • 哦,我没有注意到亲和力的恶作剧。既然已经有了答案,我就跳到它上面,找出这个长问题的关键部分是关于什么的。 :P 我猜 OP 希望将两个生产者放在同一个核心上可以避免排序问题?是的,关于更多操作-> 更多延迟的公平点,但这里实际上是 1 条额外的 CPU 指令 (and) 和 x86 或 ARM 上的基本 + 索引寻址模式。 (如果存储就像 SnB 系列上的负载,可能会增加 1 个地址生成延迟周期)。在 RISC 上,可能需要 1 或 2 条额外的地址生成指令。
  • 感谢您的 cmets!事实上,@Ben Voigt 描述的第一个问题正是导致问题的原因。我正在做线程亲和力“恶作剧”,仅仅是因为我担心缺少内存栅栏,显然情况并非如此。正如 Ben 所写,这个问题也应该出现在单处理器系统上。我将不得不考虑答案中描述的第二个问题是否是我特定用例中的问题,因为系统中的元素数量被限制为队列的最大大小。
  • @AndreasStöckel:是的,即使队列大小有限,这也是一个问题,因为剩余的 N-1 个项目可以在第 1 个项目完成处理之前多次通过队列。所以所有数组槽都可以被其他线程访问,而一个线程认为它具有独占访问权限。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-01-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多