【问题标题】:Troubles with simple Lock-Free MPSC Ring Buffer简单的无锁 MPSC 环形缓冲区的问题
【发布时间】:2019-09-27 15:49:56
【问题描述】:

我正在尝试实现一个基于数组的环形缓冲区,该缓冲区对于多个生产者和单个消费者来说是线程安全的。主要思想是具有原子头和尾索引。当将一个元素推入队列时,头部会自动增加以在缓冲区中保留一个槽:

#include <atomic>
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <vector>

template <class T> class MPSC {
private:
  int MAX_SIZE;

  std::atomic<int> head{0}; ///< index of first free slot
  std::atomic<int> tail{0}; ///< index of first occupied slot

  std::unique_ptr<T[]> data;
  std::unique_ptr<std::atomic<bool>[]> valid; ///< indicates whether data at an
                                              ///< index has been fully written

  /// Compute next index modulo size.
  inline int advance(int x) { return (x + 1) % MAX_SIZE; }

public:
  explicit MPSC(int size) {
    if (size <= 0)
      throw std::invalid_argument("size must be greater than 0");

    MAX_SIZE = size + 1;
    data = std::make_unique<T[]>(MAX_SIZE);
    valid = std::make_unique<std::atomic<bool>[]>(MAX_SIZE);
  }

  /// Add an element to the queue.
  ///
  /// If the queue is full, this method blocks until a slot is available for
  /// writing. This method is not starvation-free, i.e. it is possible that one
  /// thread always fills up the queue and prevents others from pushing.
  void push(const T &msg) {
    int idx;
    int next_idx;
    int k = 100;
    do {
      idx = head;
      next_idx = advance(idx);

      while (next_idx == tail) {     // queue is full
        k = k >= 100000 ? k : k * 2; // exponential backoff
        std::this_thread::sleep_for(std::chrono::nanoseconds(k));
      } // spin

    } while (!head.compare_exchange_weak(idx, next_idx));

    if (valid[idx])
      // this throws, suggesting that two threads are writing to the same index. I have no idea how this is possible.
      throw std::runtime_error("message slot already written");

    data[idx] = msg;
    valid[idx] = true; // this was set to false by the reader,
                       // set it to true to indicate completed data write
  }

  /// Read an element from the queue.
  ///
  /// If the queue is empty, this method blocks until a message is available.
  /// This method is only safe to be called from one single reader thread.
  T pop() {
    int k = 100;
    while (is_empty() || !valid[tail]) {
      k = k >= 100000 ? k : k * 2;
      std::this_thread::sleep_for(std::chrono::nanoseconds(k));
    } // spin
    T res = data[tail];
    valid[tail] = false;
    tail = advance(tail);
    return res;
  }

  bool is_full() { return (head + 1) % MAX_SIZE == tail; }

  bool is_empty() { return head == tail; }
};

当有很多拥塞时,一些消息会被其他线程覆盖。因此,我在这里所做的事情肯定存在根本性的问题。

似乎正在发生的事情是两个线程正在获取相同的索引来写入它们的数据。为什么会这样?

即使生产者在写入数据之前暂停,尾部也不能超过这个线程 idx,因此没有其他线程能够超越并声明相同的 idx。

编辑

冒着发布过多代码的风险,这里有一个重现问题的简单程序。它从多个线程发送一些递增的数字,并检查消费者是否收到了所有数字:

#include "mpsc.hpp" // or whatever; the above queue
#include <thread>
#include <iostream>

int main() {
  static constexpr int N_THREADS = 10; ///< number of threads
  static constexpr int N_MSG = 1E+5;   ///< number of messages per thread

  struct msg {
    int t_id;
    int i;
  };

  MPSC<msg> q(N_THREADS / 2);

  std::thread threads[N_THREADS];

  // consumer
  threads[0] = std::thread([&q] {
    int expected[N_THREADS] {};

    for (int i = 0; i < N_MSG * (N_THREADS - 1); ++i) {
      msg m = q.pop();
      std::cout << "Got message from T-" << m.t_id << ": " << m.i << std::endl;
      if (expected[m.t_id] != m.i) {
        std::cout << "T-" << m.t_id << " unexpected msg " << m.i << "; expected " << expected[m.t_id] << std::endl;
        return -1;
      }
      expected[m.t_id] = m.i + 1;
    }
  });

  // producers
  for (int id = 1; id < N_THREADS; ++id) {
    threads[id] = std::thread([id, &q] {
      for (int i = 0; i < N_MSG; ++i) {
        q.push(msg{id, i});
      }
    });
  }

  for (auto &t : threads)
    t.join();
}

【问题讨论】:

  • 恐怕你的问题离题了,因为它缺少minimal reproducible example。特别是缺少调用代码,整个template 的东西都是多余的。
  • @UlrichEckhardt 谢谢,我已经编辑了问题以包含一个重现问题的示例。
  • 当您检查 while (next_idx == tail) 时,这可能是错误的。但是之后这个检查和之前head.compare_exchange_weak(idx, next_idx) - 已经可以是next_idx == tail。而head 相同(head 进行完整循环(提前MAX_SIZE 次,而tail 一次)。这将在 2 元素缓冲区 MAX_SIZE == 2 上最明显。说 (h,t) 对 -最初是 (0,0)。当线程 #1 在这里检查 next_idx == tail 一切正常。然后另一个线程执行 - (0,0)-push->(1,0)-pop->(1, 1)-push->(0,1) 所以现在线程 #1 在 CAS 时间处于 (0,1) 状态。head == 0,但 t == 1。队列已满

标签: c++ multithreading concurrency lock-free


【解决方案1】:

我正在尝试实现一个基于数组的环形缓冲区,它对于多个生产者和单个消费者来说是线程安全的。

我假设您这样做是为了学习。如果你想解决一个真正的问题,你自己实现一个无锁队列很可能是错误的。

似乎正在发生的是两个线程正在获取相同的索引来写入它们的数据。为什么会这样?

生产者自旋锁与外部 CAS 循环的组合无法按预期方式工作:

do {
  idx = head;
  next_idx = advance(idx);

  while (next_idx == tail) {     // queue is full
    k = k >= 100000 ? k : k * 2; // exponential backoff
    std::this_thread::sleep_for(std::chrono::nanoseconds(k));
  } // spin

// 
// ...
//
// All other threads (producers and consumers) can progress.
//
// ...
//

} while (!head.compare_exchange_weak(idx, next_idx));

CAS 发生时队列可能已满,因为这些检查是独立执行的。此外,CAS 可能会成功,因为其他线程可能已经将head 提前到与idx 完全匹配。

【讨论】:

  • 谢谢,好像差不多了!以下是我目前对问题如何出现的理解:T1 选择一些索引并最终移出内部自旋循环,然后暂停。 T2 推送一些消息,阅读器将尾部推进到 T1 选择的索引加 1。然后 T2 填充队列并开始旋转。 T1 回来了,由于头部再次位于索引处,因此在 cas 中成功。因此队列现在被认为是空的,因为 T1 被推到了一个满队列。 T2 可以再次覆盖来自 T1 的消息。
  • @ionree:我不能完全按照你的描述,但我想到的场景至少是相似的,是的。根本问题是任何线程都可能在“任何”执行点暂停。结果,这些单独的检查不能很好地协同工作。可能有生产者线程在内部循环之前或之后以其“旧”值next_id 唤醒。如果您还允许 head 回绕,那么无论队列的剩余状态如何,CAS 都可能以 next_id 的“旧”值成功。
猜你喜欢
  • 2012-04-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-06-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多