【问题标题】:Does an optimistic lock-free FIFO queue implementation exist?是否存在乐观的无锁 FIFO 队列实现?
【发布时间】:2011-02-26 01:58:19
【问题描述】:

有没有“optmistic approach to lock-free FIFO queues" algorithm”的C++实现(源代码)?

【问题讨论】:

    标签: c++ multithreading queue lock-free fifo


    【解决方案1】:

    Herb Sutter 在 Dr. Dobbs Journal 的 Effective Concurency 专栏中涵盖了这样一个队列。

    Writing Lock-Free Code: A Corrected Queue

    【讨论】:

    • 这是理论,我要的是实现。有没有实现这些算法的源代码或库?
    • 你读过这篇文章吗? 第 2 页是带注释的源代码。
    • 好的,很抱歉,我希望它被包装为库或其他东西......所以我只是将源代码包含到我的项目中并使用它。与我上面提到的论文相比,这个算法有什么基准吗?
    • 比较,没有。但在他的后续文章中有一些基准,链接自他的网站:EC #16 到 EC #18。
    • 你有 atomic 的那些实现吗?
    【解决方案2】:

    我想总结一下greyfade给出的答案,它基于http://www.drdobbs.com/high-performance-computing/212201163(文章的最后部分),优化后的代码将是(进行一些修改以适应我的命名和编码约定): `

    template <typename T> class LFQueue {
    private:
        struct LFQNode {
            LFQNode( T* val ) : value(val), next(nullptr) { }
            T* value;
            AtomicPtr<LFQNode> next;
            char pad[CACHE_LINE_SIZE - sizeof(T*) - sizeof(AtomicPtr<LFQNode>)];
        };
    
        char pad0[CACHE_LINE_SIZE];
        LFQNode* first;                 // for one consumer at a time
        char pad1[CACHE_LINE_SIZE - sizeof(LFQNode*)];
        InterlockedFlag consumerLock;   // shared among consumers
        char pad2[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
        LFQNode* last;                  // for one producer at a time
        char pad3[CACHE_LINE_SIZE - sizeof(LFQNode*)];
        InterlockedFlag producerLock;   // shared among producers
        char pad4[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
    public:
        LFQueue() {
            first = last = new LFQNode( nullptr ); // no more divider
            producerLock = consumerLock = false;
        }
    
        ~LFQueue() {
            while( first != nullptr ) {
                LFQNode* tmp = first;
                first = tmp->next;
                delete tmp;
            }
        }
    
        bool pop( T& result ) {
            while( consumerLock.set(true) ) 
            { }                             // acquire exclusivity
            if( first->next != nullptr ) {  // if queue is nonempty 
                LFQNode* oldFirst = first;
                first = first->next;
                T* value = first->value;    // take it out
                first->value = nullptr;     // of the Node
                consumerLock = false;       // release exclusivity
                result = *value;            // now copy it back
                delete value;               // and clean up
                delete oldFirst;            // both allocations
                return true;                // and report success
            }
            consumerLock = false;           // release exclusivity
            return false;                   // queue was empty
        }
    
        bool push( const T& t )  {
            LFQNode* tmp = new LFQNode( t );    // do work off to the side
            while( producerLock.set(true) ) 
            { }                             // acquire exclusivity
            last->next = tmp;               // A: publish the new item
            last = tmp;                     // B: not "last->next"
            producerLock = false;           // release exclusivity
            return true;
        }
    };
    

    `

    另一个问题是你如何定义CACHE_LINE_SIZE?它在 CPU 上有所不同,对吧?

    【讨论】:

    • 我认为,64 字节是一个不错的选择。但您可能希望平衡它与大小,所以我建议查看您的目标 CPU 并选择适合您希望定位的最常见目标的大小。
    • 请注意:这不是一个论坛,所以不能假设人们“浏览主题”。如果您想问其他问题,最好使用“”字段而不是“您的答案”字段。
    • 我确实在重新回答这个问题,但我在回答字段中提问错了,我应该在我自己的新答案下添加新评论。对此感到抱歉。
    • 我已经完成了针对 std::queue 在 windows 中使用 CRITICAL_SECTION 锁定的基准测试,无锁队列实际上比 std 的实现慢 2~3 倍 ::带锁的队列。你知道为什么吗?是因为链表吗?
    • 哎哟。缓存行对齐黑客是丑陋的。当您的代码在具有不同缓存安排的 CPU 上运行时会发生什么?此外,该结构正在消耗大量缓存。 L1 缓存是一种稀缺资源。我可以理解它正在完成,否则你们物理上的绑定实体在逻辑上是分开的,但仍然 - 哎哟。贵。
    【解决方案3】:

    这是我实现的无锁 FIFO。

    确保 T 的每一项都是 64 字节(Intel CPU 中的缓存行大小)的倍数,以避免错误共享。

    这段代码用 gcc/mingw 编译,应该用 clang 编译。它针对 64 位进行了优化,因此要使其在 32 位上运行需要进行一些重构。

    https://github.com/vovoid/vsxu/blob/master/engine/include/vsx_fifo.h

    vsx_fifo<my_struct, 512> my_fifo;
    

    发件人:

    my_struct my_struct_inst;
    ... fill it out ...
    while (!my_fifo.produce(my_struct_inst)) {}
    

    接收者:

    my_struct my_struct_recv;
    while(my_fifo.consume(my_struct_recv)) 
    { 
      ...do stuff...
    }
    

    【讨论】:

      【解决方案4】:

      这个lfqueue怎么样

      这是一个跨平台、无限入队线程安全队列,已经过测试multi deq、multi enq-deq 和multi enq。保证内存安全。

      例如

      int* int_data;
      lfqueue_t my_queue;
      
      if (lfqueue_init(&my_queue) == -1)
          return -1;
      
      /** Wrap This scope in other threads **/
      int_data = (int*) malloc(sizeof(int));
      assert(int_data != NULL);
      *int_data = i++;
      /*Enqueue*/
       while (lfqueue_enq(&my_queue, int_data) == -1) {
          printf("ENQ Full ?\n");
      }
      
      /** Wrap This scope in other threads **/
      /*Dequeue*/
      while  ( (int_data = lfqueue_deq(&my_queue)) == NULL) {
          printf("DEQ EMPTY ..\n");
      }
      
      // printf("%d\n", *(int*) int_data );
      free(int_data);
      /** End **/
      
      lfqueue_destroy(&my_queue);
      

      【讨论】:

        【解决方案5】:

        如果您正在寻找一个好的无锁队列实现,Microsoft Visual Studio 2010 和 Intel 的 Thread Building Blocks 都包含一个很好的 LF 队列,与论文类似。

        Here's a link to the one in VC 2010

        【讨论】:

        • 我尝试了 vs2010 并进行了基准测试,它在小数据集上比 "std::queue with lock" 快,但在大数据集上呈指数级慢
        猜你喜欢
        • 2023-04-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-04-30
        • 2017-08-08
        • 2013-10-27
        • 1970-01-01
        • 2013-09-16
        相关资源
        最近更新 更多