【发布时间】:2019-01-08 12:18:24
【问题描述】:
我正在尝试优化 SPSC 队列中的消费者延迟,如下所示:
template <typename TYPE>
class queue
{
public:
void produce(message m)
{
const auto lock = std::scoped_lock(mutex);
has_new_messages = true;
new_messages.emplace_back(std::move(m));
}
void consume()
{
if (UNLIKELY(has_new_messages))
{
const auto lock = std::scoped_lock(mutex);
has_new_messages = false;
messages_to_process.insert(
messages_to_process.cend(),
std::make_move_iterator(new_messages.begin()),
std::make_move_iterator(new_messages.end()));
new_messages.clear();
}
// handle messages_to_process, and then...
messages_to_process.clear();
}
private:
TYPE has_new_messages{false};
std::vector<message> new_messages{};
std::vector<message> messages_to_process{};
std::mutex mutex;
};
这里的消费者尽可能避免为互斥锁的锁定/解锁付费,并在锁定互斥锁之前进行检查。
问题是:我必须使用TYPE = std::atomic<bool> 还是我可以节省原子操作和阅读 volatile bool 是否可以?
It's known that a volatile variable per se doesn't guarantee thread safety,然而,std::mutex::lock() 和 std::mutex::unlock() 提供了一些内存排序保证。我是否可以依靠他们对volatile bool has_new_messages 进行更改以最终对mutex 范围之外的消费者线程可见?
更新:在@Peter Cordes'advice 之后,我将其重写如下:
void produce(message m)
{
{
const auto lock = std::scoped_lock(mutex);
new_messages.emplace_back(std::move(m));
}
has_new_messages.store(true, std::memory_order_release);
}
void consume()
{
if (UNLIKELY(has_new_messages.exchange(false, std::memory_order_acq_rel))
{
const auto lock = std::scoped_lock(mutex);
messages_to_process.insert(...);
new_messages.clear();
}
}
【问题讨论】:
-
你可以依赖互斥锁/解锁,你不需要 volatile ,但不能超出范围
-
为什么
has_new_messages使用模板类型?不应该只是bool吗?还是这样你就可以做一个bool与volatile bool版本? -
@PeterCordes,是的,只是为了演示目的。这是针对
boolvsvolatile boolvsstd::atomic<bool>。 -
在
xchg上旋转是次优的,而pause是只读的,我认为。也许if(exchange) { ... } else { _mm_pause(); }也可以。通常你想旋转只读,如果你的只读检查表明它应该工作,只有xchg。但是与单独的读/写相比,交换似乎没有优势,如果没有其他线程可以从你下面消耗它。 -
在我看来,更好的锁/信号量应该能够完成整个工作,而不是让缓存线在内核之间弹跳以访问
has_new_messages以及互斥锁。不过,我不知道某种基于计数器的锁是否可以解决问题。
标签: c++ thread-safety x86-64 volatile stdatomic