【发布时间】:2011-02-26 01:58:19
【问题描述】:
有没有“optmistic approach to lock-free FIFO queues" algorithm”的C++实现(源代码)?
【问题讨论】:
标签: c++ multithreading queue lock-free fifo
有没有“optmistic approach to lock-free FIFO queues" algorithm”的C++实现(源代码)?
【问题讨论】:
标签: c++ multithreading queue lock-free fifo
Herb Sutter 在 Dr. Dobbs Journal 的 Effective Concurency 专栏中涵盖了这样一个队列。
【讨论】:
我想总结一下greyfade给出的答案,它基于http://www.drdobbs.com/high-performance-computing/212201163(文章的最后部分),优化后的代码将是(进行一些修改以适应我的命名和编码约定): `
template <typename T> class LFQueue {
private:
struct LFQNode {
LFQNode( T* val ) : value(val), next(nullptr) { }
T* value;
AtomicPtr<LFQNode> next;
char pad[CACHE_LINE_SIZE - sizeof(T*) - sizeof(AtomicPtr<LFQNode>)];
};
char pad0[CACHE_LINE_SIZE];
LFQNode* first; // for one consumer at a time
char pad1[CACHE_LINE_SIZE - sizeof(LFQNode*)];
InterlockedFlag consumerLock; // shared among consumers
char pad2[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
LFQNode* last; // for one producer at a time
char pad3[CACHE_LINE_SIZE - sizeof(LFQNode*)];
InterlockedFlag producerLock; // shared among producers
char pad4[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
public:
LFQueue() {
first = last = new LFQNode( nullptr ); // no more divider
producerLock = consumerLock = false;
}
~LFQueue() {
while( first != nullptr ) {
LFQNode* tmp = first;
first = tmp->next;
delete tmp;
}
}
bool pop( T& result ) {
while( consumerLock.set(true) )
{ } // acquire exclusivity
if( first->next != nullptr ) { // if queue is nonempty
LFQNode* oldFirst = first;
first = first->next;
T* value = first->value; // take it out
first->value = nullptr; // of the Node
consumerLock = false; // release exclusivity
result = *value; // now copy it back
delete value; // and clean up
delete oldFirst; // both allocations
return true; // and report success
}
consumerLock = false; // release exclusivity
return false; // queue was empty
}
bool push( const T& t ) {
LFQNode* tmp = new LFQNode( t ); // do work off to the side
while( producerLock.set(true) )
{ } // acquire exclusivity
last->next = tmp; // A: publish the new item
last = tmp; // B: not "last->next"
producerLock = false; // release exclusivity
return true;
}
};
`
另一个问题是你如何定义CACHE_LINE_SIZE?它在 CPU 上有所不同,对吧?
【讨论】:
64 字节是一个不错的选择。但您可能希望平衡它与大小,所以我建议查看您的目标 CPU 并选择适合您希望定位的最常见目标的大小。
这是我实现的无锁 FIFO。
确保 T 的每一项都是 64 字节(Intel CPU 中的缓存行大小)的倍数,以避免错误共享。
这段代码用 gcc/mingw 编译,应该用 clang 编译。它针对 64 位进行了优化,因此要使其在 32 位上运行需要进行一些重构。
https://github.com/vovoid/vsxu/blob/master/engine/include/vsx_fifo.h
vsx_fifo<my_struct, 512> my_fifo;
发件人:
my_struct my_struct_inst;
... fill it out ...
while (!my_fifo.produce(my_struct_inst)) {}
接收者:
my_struct my_struct_recv;
while(my_fifo.consume(my_struct_recv))
{
...do stuff...
}
【讨论】:
这个lfqueue怎么样
这是一个跨平台、无限入队线程安全队列,已经过测试multi deq、multi enq-deq 和multi enq。保证内存安全。
例如
int* int_data;
lfqueue_t my_queue;
if (lfqueue_init(&my_queue) == -1)
return -1;
/** Wrap This scope in other threads **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&my_queue, int_data) == -1) {
printf("ENQ Full ?\n");
}
/** Wrap This scope in other threads **/
/*Dequeue*/
while ( (int_data = lfqueue_deq(&my_queue)) == NULL) {
printf("DEQ EMPTY ..\n");
}
// printf("%d\n", *(int*) int_data );
free(int_data);
/** End **/
lfqueue_destroy(&my_queue);
【讨论】:
如果您正在寻找一个好的无锁队列实现,Microsoft Visual Studio 2010 和 Intel 的 Thread Building Blocks 都包含一个很好的 LF 队列,与论文类似。
【讨论】: