【问题标题】:Dequeue from a ConcurrentQueue only if a condition is met仅在满足条件时才从 ConcurrentQueue 中出列
【发布时间】:2015-05-19 21:55:52
【问题描述】:

只有在满足某些条件时,我才能将ConcurrentQueue 的下一个元素出列?

例如如果下一个要出队的项目满足某个条件,则将其出队,否则离开。本质上是'DequeueIf''TryDequeueIf' 方法

例子:

var myQueue = new ConcurrentQueue<int>()
...
int item;
// only dequeue if the next item is 0
bool success = myQueue.TryDequeueIf(out item, x=>x==0) 

当然可以先调用TryPeek,检查条件,然后TryDequeue,但这不再是线程安全的。

我可以将整个TryPeek &amp; TryDequeue 包裹在一个锁中,但这有点违背了使用 ConcurrentQueue 的目的;并且意味着所有常规的无条件出队也必须被锁定。我不确定我是否必须锁定每个Enqueue 才能保存。如果可能的话,我想避免实施我自己的锁定策略可能出现的陷阱。

是否有使用 .net4.0 ConcurrentQueue 类或其他并发类之一的无锁解决方案?

【问题讨论】:

    标签: c# multithreading concurrency locking queue


    【解决方案1】:

    使用内置方法无法做到这一点。怎么办?

    1. 编写您自己的简单队列。只需为每个队列使用一个锁。除非您在队列中有 非常 高流量,否则这将执行得很好。非竞争锁每个周期消耗两个互锁操作。
    2. 使用联锁操作编写复杂的队列。使用 CAS 重试循环,您可以实现谓词原子获取操作。您可能可以使用 BCL 源代码作为起点或灵感。
    3. 放弃排队的想法。在非队列上执行此操作很容易。如果您绘制了错误的项目,只需将其重新插入队列即可。

    【讨论】:

      【解决方案2】:

      作为@usr answer 的附加组件:

      我可以将整个 TryPeekTryDequeue 包裹在一个锁中,但这有点违背了使用 ConcurrentQueue 的目的

      不是真的,或者至少不是完全的,因为它的主要目的之一是协调生产者/消费者的集合点,在这种情况下使用锁不会阻塞任何生产者,只会阻塞其他消费者。但这确实意味着您将需要对所有其他变异读取操作使用相同的锁。

      因此,如果这是不可接受的,您将不得不这样做

      1. 滚动您自己的并发队列或针对您的特定使用模式优化的更好的数据结构(堆?)实现。
      2. 考虑一种更适合您需求的替代方法:使用基于“推”或“事件”的模型,而不是基于“拉”的模型。使用例如使用 where 过滤器观察的 Rx 和消费者。

      【讨论】:

        【解决方案3】:

        大多数concurrentQueue实现都使用无锁实现,以允许许多生产者和消费者同时写入队列而不会互相阻塞(除了偶尔的spin-retry)

        条件出队的基本要求是'pop if on top'函数的实现:'bool dequeueIf(T)//如果T仍然是头,则出队T,如果不是头则返回false;这使您可以:

        T t=null;
        do{ // need to spin in case the head changes between the peek and dequeue attempt
            done=-1; // assume failure
            T t2=peek(), 
            if (t!=null && someComplexCondition(t)) {// then it's what we want
                if(!dequeueIf(T)) // check to see if the head is changed
                     {done=0;} // spin if the head is changed
                else {done=1;t=t2;} // it meets the criteria, and we de-queued it.
            }
        } while (done==0);
        // done = -1 == head does not meet criteria
        // done = 1 == T contains poped item that met criteria
        

        dequeueIf(T) 需要在队列内部实现。

        如果是 .Net ConcurrentQueue,则需要修改版本的 Dequeue(out T result):Dequeue(ref T toRemove)。

        这将传递给 concurrentQueue.Segment.TryRemove(out T result) 的新版本:concurrentQueue.Segment.TryRemove(ref T toRemove)

        此函数需要在执行 compareExchange 以出队之前,首先将 'local' 处的 T 与 toRemove 进行比较,并且仅在它仍然位于顶部时才将其删除(否则它可以假定另一个线程 TryRemove 已经弹出队列中的值,因此您不能返回有问题的值;然后 toRemove 应该为空)。

        注意;你不能重载 f(out T) 和 f(ref T),所以返回 bool 表示你现在“拥有”你请求弹出的 T 似乎是一个合理的解决方案,否则我会使用 f(T toRemove ) 以允许原始函数签名保持不变。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2018-01-22
          • 1970-01-01
          • 2011-05-13
          • 2013-06-11
          • 2020-01-10
          • 2021-08-25
          • 2012-12-25
          相关资源
          最近更新 更多