【发布时间】:2019-09-27 15:49:56
【问题描述】:
我正在尝试实现一个基于数组的环形缓冲区,该缓冲区对于多个生产者和单个消费者来说是线程安全的。主要思想是具有原子头和尾索引。当将一个元素推入队列时,头部会自动增加以在缓冲区中保留一个槽:
#include <atomic>
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <vector>
template <class T> class MPSC {
private:
int MAX_SIZE;
std::atomic<int> head{0}; ///< index of first free slot
std::atomic<int> tail{0}; ///< index of first occupied slot
std::unique_ptr<T[]> data;
std::unique_ptr<std::atomic<bool>[]> valid; ///< indicates whether data at an
///< index has been fully written
/// Compute next index modulo size.
inline int advance(int x) { return (x + 1) % MAX_SIZE; }
public:
explicit MPSC(int size) {
if (size <= 0)
throw std::invalid_argument("size must be greater than 0");
MAX_SIZE = size + 1;
data = std::make_unique<T[]>(MAX_SIZE);
valid = std::make_unique<std::atomic<bool>[]>(MAX_SIZE);
}
/// Add an element to the queue.
///
/// If the queue is full, this method blocks until a slot is available for
/// writing. This method is not starvation-free, i.e. it is possible that one
/// thread always fills up the queue and prevents others from pushing.
void push(const T &msg) {
int idx;
int next_idx;
int k = 100;
do {
idx = head;
next_idx = advance(idx);
while (next_idx == tail) { // queue is full
k = k >= 100000 ? k : k * 2; // exponential backoff
std::this_thread::sleep_for(std::chrono::nanoseconds(k));
} // spin
} while (!head.compare_exchange_weak(idx, next_idx));
if (valid[idx])
// this throws, suggesting that two threads are writing to the same index. I have no idea how this is possible.
throw std::runtime_error("message slot already written");
data[idx] = msg;
valid[idx] = true; // this was set to false by the reader,
// set it to true to indicate completed data write
}
/// Read an element from the queue.
///
/// If the queue is empty, this method blocks until a message is available.
/// This method is only safe to be called from one single reader thread.
T pop() {
int k = 100;
while (is_empty() || !valid[tail]) {
k = k >= 100000 ? k : k * 2;
std::this_thread::sleep_for(std::chrono::nanoseconds(k));
} // spin
T res = data[tail];
valid[tail] = false;
tail = advance(tail);
return res;
}
bool is_full() { return (head + 1) % MAX_SIZE == tail; }
bool is_empty() { return head == tail; }
};
当有很多拥塞时,一些消息会被其他线程覆盖。因此,我在这里所做的事情肯定存在根本性的问题。
似乎正在发生的事情是两个线程正在获取相同的索引来写入它们的数据。为什么会这样?
即使生产者在写入数据之前暂停,尾部也不能超过这个线程 idx,因此没有其他线程能够超越并声明相同的 idx。
编辑
冒着发布过多代码的风险,这里有一个重现问题的简单程序。它从多个线程发送一些递增的数字,并检查消费者是否收到了所有数字:
#include "mpsc.hpp" // or whatever; the above queue
#include <thread>
#include <iostream>
int main() {
static constexpr int N_THREADS = 10; ///< number of threads
static constexpr int N_MSG = 1E+5; ///< number of messages per thread
struct msg {
int t_id;
int i;
};
MPSC<msg> q(N_THREADS / 2);
std::thread threads[N_THREADS];
// consumer
threads[0] = std::thread([&q] {
int expected[N_THREADS] {};
for (int i = 0; i < N_MSG * (N_THREADS - 1); ++i) {
msg m = q.pop();
std::cout << "Got message from T-" << m.t_id << ": " << m.i << std::endl;
if (expected[m.t_id] != m.i) {
std::cout << "T-" << m.t_id << " unexpected msg " << m.i << "; expected " << expected[m.t_id] << std::endl;
return -1;
}
expected[m.t_id] = m.i + 1;
}
});
// producers
for (int id = 1; id < N_THREADS; ++id) {
threads[id] = std::thread([id, &q] {
for (int i = 0; i < N_MSG; ++i) {
q.push(msg{id, i});
}
});
}
for (auto &t : threads)
t.join();
}
【问题讨论】:
-
恐怕你的问题离题了,因为它缺少minimal reproducible example。特别是缺少调用代码,整个
template的东西都是多余的。 -
@UlrichEckhardt 谢谢,我已经编辑了问题以包含一个重现问题的示例。
-
当您检查
while (next_idx == tail)时,这可能是错误的。但是之后这个检查和之前head.compare_exchange_weak(idx, next_idx)- 已经可以是next_idx == tail。而head相同(head进行完整循环(提前MAX_SIZE次,而tail一次)。这将在 2 元素缓冲区MAX_SIZE == 2上最明显。说 (h,t) 对 -最初是 (0,0)。当线程 #1 在这里检查next_idx == tail一切正常。然后另一个线程执行 - (0,0)-push->(1,0)-pop->(1, 1)-push->(0,1) 所以现在线程 #1 在 CAS 时间处于 (0,1) 状态。head == 0,但 t == 1。队列已满
标签: c++ multithreading concurrency lock-free