【问题标题】:Are there any implementations of concurrent lock-free blocking queues?有没有并发无锁阻塞队列的实现?
【发布时间】:2011-04-20 17:55:58
【问题描述】:

我知道阻塞队列和无锁队列,Scott et al. 提供了这些实现的一个很好的例子,但是有没有无锁阻塞队列的实现?

在无锁阻塞队列中,出队不需要锁定,但如果队列中没有项目,它将阻塞消费者。有没有这种野兽的实现?我更喜欢它们是 C# 实现,但任何实现在技术上都可以工作。

更新:

我想我最终会在 D14.1 行出现竞争条件:

initialize(Q: pointer to queue t)
node = new node() // Allocate a free node
node–>next.ptr = NULL // Make it the only node in the linked list
Q–>Head = Q–>Tail = node // Both Head and Tail point to it
signal = new ManualResetEvent() // create a manual reset event

    enqueue(Q: pointer to queue t, value: data type)
E1:     node = new node() // Allocate a new node from the free list
E2:     node–>value = value // Copy enqueued value into node
E3:     node–>next.ptr = NULL // Set next pointer of node to NULL
E4:     loop // Keep trying until Enqueue is done
E5:         tail = Q–>Tail // Read Tail.ptr and Tail.count together
E6:         next = tail.ptr–>next // Read next ptr and count fields together
E7:         if tail == Q–>Tail // Are tail and next consistent?
E8:             if next.ptr == NULL // Was Tail pointing to the last node?
E9:                 if CAS(&tail.ptr–>next, next, <node, next.count+1>) // Try to link node at the end of the linked list
E10.1:                  signal.Set() // Signal to the blocking dequeues
E10.2:                  break // Enqueue is done. Exit loop
E11:                endif
E12:            else // Tail was not pointing to the last node
E13:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Try to swing Tail to the next node
E14:            endif
E15:        endif
E16:     endloop
E17:    CAS(&Q–>Tail, tail, <node, tail.count+1>) // Enqueue is done. Try to swing Tail to the inserted node


    dequeue(Q: pointer to queue t, pvalue: pointer to data type): boolean
D1:     loop // Keep trying until Dequeue is done
D2:         head = Q–>Head // Read Head
D3:         tail = Q–>Tail // Read Tail
D4:         next = head–>next // Read Head.ptr–>next
D5:         if head == Q–>Head // Are head, tail, and next consistent?
D6:             if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7:                 if next.ptr == NULL // Is queue empty?
D8.1:                   signal.WaitOne() // Block until an enqueue
D8.X:                   // remove the return --- return FALSE // Queue is empty, couldn’t dequeue
D9:                 endif
D10:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Tail is falling behind. Try to advance it
D11:            else // No need to deal with Tail
                    // Read value before CAS, otherwise another dequeue might free the next node
D12:                *pvalue = next.ptr–>value
D13:                if CAS(&Q–>Head, head, <next.ptr, head.count+1>) // Try to swing Head to the next node
D14.1:                  if(head.ptr == tail.ptr && next.ptr==NULL) // Is queue empty? <--- POSSIBLE RACE CONDITION???
D14.2:                      signal.Reset()
D14.3:                  break // Dequeue is done. Exit loop
D15:                endif
D16:            endif
D17:         endif
D18:    endloop
D19:    free(head.ptr) // It is safe now to free the old dummy node
D20:    return TRUE // Queue was not empty, dequeue succeeded

【问题讨论】:

  • 您的 pdf 链接不正确,您可以修复它吗?
  • 链接是research.ibm.com/people/m/michael/podc-1996.pdf。他错过了结尾的“f”。
  • @Will,抱歉,我的链接在 pdf 中缺少 f,现在应该没问题(感谢您指出)。
  • 注意:.NET 4 带来了很多的并发和无锁实现。
  • 感谢所有固定链接。

标签: c# multithreading concurrency lock-free blockingqueue


【解决方案1】:

编辑:

更简单: 我建议您的队列不需要头部和尾部。只要有头。如果head = NULL,则列表为空。将项目添加到头部。从头部移除物品。更简单、更少的 CAS 操作。

助手: 我在 cmets 中建议您需要考虑一个辅助方案来处理比赛。在我的“无锁”版本中,如果它们不会引起问题,那么可以有罕见的竞争条件。我喜欢额外的性能,而不是让空闲线程休眠几毫秒。

帮助者的想法。当消费者抓住工作时,它可以检查是否有处于昏迷状态的线程。当生产者添加工作时,它可以寻找处于昏迷状态的线程。

所以跟踪睡眠者。使用睡眠者的链接列表。当一个线程决定没有工作时,它会将自己标记为 !awake 并将 CAS 自己标记为 sleeper 列表的头部。当接收到唤醒信号时,线程将自身标记为已唤醒。然后新唤醒的线程清理 sleeper 列表。要清理并发单链表,你必须小心。你只能对头部进行CAS。因此,当睡眠者列表的头部标记为唤醒时,您可以 CAS 关闭头部。如果头部没有醒来,继续扫描列表并“懒惰地取消链接”(我把那个术语做了)剩余的清醒项目。延迟取消链接很简单......只需将上一个项目的下一个 ptr 设置在唤醒项目上。并发扫描仍然会到达列表的末尾,即使它到达 !awake 的项目。随后的扫描会看到一个较短的列表。最后,每当您添加工作或完成工作时,请扫描睡眠者列表以查找 !awake 项目。如果消费者在抓取一些工作后注意到工作仍然存在(.next work != NULL),消费者可以扫描睡眠者列表并通知第一个线程是 !awake。生产者添加工作后,生产者可以扫描 sleeper 列表并执行相同操作。

如果您有一个广播场景并且无法向单个线程发出信号,那么只需保持睡眠线程的计数即可。虽然该计数仍然 > 0,但消费者注意到剩余工作和消费者添加工作会广播唤醒信号。

在我们的环境中,每个 SMT 有 1 个线程,因此休眠列表永远不会那么大(除非我得到其中一台新的 128 个并发线程机器!)我们在事务的早期生成工作项。在第一秒内,我们可能会生成 10,000 个工作项,而这种生产会迅速减少。线程在这些工作项上工作了几秒钟。所以,我们很少有空闲池上的线程。

你仍然可以使用锁 如果您只有 1 个线程并且很少生成工作……这对您不起作用。在这种情况下,互斥体的性能无关紧要,您应该只使用它们。在这种情况下,对睡眠队列使用锁。将无锁视为“重要的地方没有锁”。

上一篇文章: 你是说: 有一个工作队列。 有很多消费者线程。 如果有任何工作,消费者需要拉出工作并去做 消费者线程需要休眠直到有工作。

如果你是这样,我们只使用原子操作来做到这一点:

工作队列是一个链表。还有一个休眠线程的链表。

要添加作品:CAS 列表的头部到新作品。添加工作时,我们检查睡眠列表中是否有任何线程。如果有,在添加工作之前,我们将一个睡眠者从睡眠者列表中移除,设置其工作=新工作,然后通知睡眠者唤醒。我们将工作添加到工作队列中。

要消耗工作:将列表的头部CAS 转换为head->next。如果工作列表的头部为 NULL,我们将线程 CAS 到睡眠者列表。

一旦线程有一个工作项,该线程必须将该工作项的状态CAS 设置为WORK_INPROGRESS 或类似的。如果失败,则意味着工作正在由另一个人执行,因此消费者线程返回搜索工作。如果一个线程被唤醒并且有一个工作项,它仍然需要对状态进行 CAS。

因此,如果添加工作,睡觉的消费者总是会被唤醒并交给工作。 pthread_kill() 总是在 sigwait() 唤醒线程,因为即使线程在信号之后到达 sigwait,也会收到信号。这解决了线程将自己置于睡眠者列表但在进入睡眠之前收到信号的问题。所发生的只是线程试图拥有它的 ->work(如果有的话)。未能拥有工作或没有工作将线程发送回消费开始。如果一个线程未能 CAS 到 sleeper 列表,则意味着另一个线程击败了它,或者生产者拉下了一个 sleeper。为了安全起见,我们让线程的行为就像它刚刚被唤醒一样。

我们这样做没有竞争条件,并且有多个生产者和消费者。我们还能够扩展它以允许线程也可以在单个工作项上休眠。

【讨论】:

  • @johnnycrash 不,我的意思是消费者从队列中拉出一个项目,直到队列中没有任何内容。当消费者调用 dequeue 并且队列中没有任何内容时,消费者将在 dequeue 调用中被阻塞(这是标准的阻塞队列,我已经拥有)。有关两个队列的描述,请参阅参考论文...基本上阻塞队列锁定出队,无锁队列不锁定(如您所说,它使用 CAS),但如果没有任何内容,则无锁队列返回队列,我希望无锁队列阻塞而不是返回。
  • @Lirik,当队列为空时,消费者调用sigwait。你说的阻塞是这个意思吗?此外,我们还没有在队列中发现头尾的​​用途。我们只使用一个头,入队和出队要简单得多。
  • @Lirik,当线程被添加到休眠列表并调用 sigwait() 时,我们的 GetWork() 函数会阻塞。 sigwait() 使线程休眠。稍后当工作出现时,我们调用 pthread_kill() 来唤醒线程。我不知道 .Net 是否有办法像 linux 那样让线程休眠并通过信号唤醒它。但是,长话短说......当队列为空时我们会阻塞,并且我们不会在任何地方使用锁。
  • @johnnycrash 我现在关注你了!好的,我想我知道如何完成这项工作:我可以在 ManualResetEvent 上使用 dequeue 块,直到队列中有东西(ManualResetEvent 将在 enqueue 上设置并在 dequeue 上重置当队列为空时)。今晚晚些时候我会试着记下一些代码......我想我现在明白了。
  • @Lirik,酷!我相信,它工作的关键是,如果您将工作添加到队列中,您总是会唤醒睡眠者。如果没有睡眠者......不要唤醒任何东西。将自己置于卧铺名单后,注意完成睡眠的比赛。如果生产者在线程完成睡眠之前向该线程发出信号,您可能进入所有线程都处于睡眠状态的情况。我们没有看到这种情况发生,因为 sigwait 缓存了信号。现在你让我想到 sigwait() arrrrrgh 中可能有一个互斥锁!
【解决方案2】:

.NET 并行扩展:(内置,用于 .NET 4.0+):

http://blogs.msdn.com/b/pfxteam/archive/2010/01/26/9953725.aspx


来自 StackOverflow 实现的某人:

Lock free constructs in .net



回应 cmets 中的澄清:

如果空时阻塞不忙(等待信号),那么您似乎需要一个计数信号量来等待。

另一种方法是使用常规队列,结合原子比较和交换或自旋锁来防止同时访问,
那么如果消费者线程在队列为空时尝试进入,锁定二进制信号量,
如果提供者线程在队列为空时尝试进入,则解锁二进制信号量以唤醒所有休眠消费者(并将它们返回自旋锁,以便多个线程只有在队列中有足够的项目时才能进入)。

例如// 伪代码

/// Non-blocking lock (busy wait)
void SpinLock()
{
    While (CompareAndExchange(myIntegerLock, -1, 0) != 0)
    {
        // wait
    }
}

void UnSpinLock()
{
    Exchange(myIntegerLock, 0);
}

void AddItem(item)
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    queue.Push(item);

    // Unblock any blocked consumers
    if (queue.Count() == 1)
    {
        semaphore.Increase();
    }

    // End of CAS synchronization block
    UnSpinLock();
}

Item RemoveItem()
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    // If empty then block
    if (queue.Count() == 0)
    {
        // End of CAS synchronization block
        UnSpinLock();

        // Block until queue is not empty
        semaphore.Decrease();

        // Try again (may fail again if there is more than one consumer)
        return RemoveItem();
    }

    result = queue.Pop();

    // End of CAS synchronization block
    UnSpinLock();

    return result;
}

【讨论】:

  • @Danny 感谢您提供的链接,但我专门寻找无锁阻塞队列,从我读到的内容来看,这似乎是一件很难实现的事情。我想知道是否有人真的设法让一个工作。
  • 如果队列忙,阻塞客户端线程? - 如果是这样,围绕队列的自旋锁(互斥锁/忙等待)应该可以解决问题。
  • @Danny 如果队列为空则阻塞,当队列中有项目时不应该有锁。 Scott 的论文展示了两种实现:无锁队列和阻塞队列……它们都是用 java 实现的。我没有看到无锁和阻塞队列(即只有在它为空时才阻塞)。
  • @lukas ConcurrentBag 非常好,但是使用了锁定:stackoverflow.com/questions/4785622/…
  • @Danny 看到我的编辑,我添加了伪代码和我“设想”的解决方案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-11-15
  • 2013-07-25
相关资源
最近更新 更多