【问题标题】:ConcurrentQueue that allows me to wait on one producer允许我等待一个生产者的 ConcurrentQueue
【发布时间】:2015-12-18 10:48:43
【问题描述】:

我有生产者/消费者的问题。目前我有一个简单的Queuelock 包围。

我正在尝试用更高效的方法来替换它。

我的第一选择是使用ConcurrentQueue,但我不知道如何让我的消费者等待下一条生成的消息(不执行 Thread.Sleep)。

另外,如果队列的大小达到特定数字,我希望能够清除整个队列。

您能否推荐一些符合我要求的现有类或实现?

【问题讨论】:

  • 你试过BlockingCollection类吗?
  • 看看this question。虽然它是关于多个制作人的,但如果你有一个制作人也没关系。
  • 您能否详细说明“如果队列大小达到...则清除整个队列”?对于其他需求,BlockingCollection 是理想的选择。
  • @YacoubMassad 实际上我确实看到了它,但最初,从它的名字来看,我认为它是 ConcurrentQueueCollection 风格,它不处理 FIFO 行为。不过好像不是这样,我现在就测试一下
  • 您能否提供更多关于清算要求的背景信息?

标签: c# .net multithreading concurrency


【解决方案1】:

这是一个示例,说明如何使用 BlockingCollection 类来做你想做的事:

BlockingCollection<int> blocking_collection = new BlockingCollection<int>();

//Create producer on a thread-pool thread
Task.Run(() =>
{
    int number = 0;

    while (true)
    {
        blocking_collection.Add(number++);

        Thread.Sleep(100); //simulating that the producer produces ~10 items every second
    }
});

int max_size = 10; //Maximum items to have

int items_to_skip = 0;

//Consumer
foreach (var item in blocking_collection.GetConsumingEnumerable())
{
    if (items_to_skip > 0)
    {
        items_to_skip--; //quickly skip items (to meet the clearing requirement)
        continue;
    }

    //process item
    Console.WriteLine(item);

    Thread.Sleep(200); //simulating that the consumer can only process ~5 items per second

    var collection_size = blocking_collection.Count;

    if (collection_size > max_size) //If we reach maximum size, we flag that we want to skip items
    {
        items_to_skip = collection_size;
    }
}

【讨论】:

  • Nice :) 在我最初的实现中,我做了一个while(blocking_collection.TryTake(out obj)){}(如果我错过了一个元素,这没什么大不了的。但我觉得你的实现更快更精确!对于Peek 实现,我用FirstOrDefault(),你怎么看?我知道它不会阻塞其他东西,但在我的情况下,没关系
  • 能解释一下为什么要偷看吗?也许有更好的方法来做到这一点。
猜你喜欢
  • 1970-01-01
  • 2014-04-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-04-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多