【问题标题】:Multiple consumers and querying a C# BlockingCollection多个消费者并查询 C# BlockingCollection
【发布时间】:2011-09-23 11:19:34
【问题描述】:

我正在使用 .NET 4.0 BlockingCollection 来处理一个项目队列,每个项目都需要由一个操作处理,该操作可能需要一秒钟来处理每个项目。这个项目队列可以由不同的线程添加。

我对此有几个问题 a) 允许多个消费者处理此 BlockingCollection?我注意到 GetConsumingEnumerable(),它似乎适用于单一消费者场景。有多个消费者的原因是通过命名管道实例的处理一次最多可以处理三个这样的项目,所以我想我可以有三个消费者。

b) 有没有办法检查一个项目是否在这个队列中,如果是,让调用者检查是否有一个项目要阻塞,直到项目被处理?

编辑:

根据 Jon Skeet 的回答,这里有一些示例代码来说明多个消费者作用于由单个生产者填充的 BlockingCollection,消费者使用 GetConsumingEnumerable()

static BlockingCollection<string> coll = new BlockingCollection<string>();

static void Consume()
{
    foreach (var i in coll.GetConsumingEnumerable())
    {
        Console.WriteLine(String.Format("Thread {0} Consuming: {1}",  Thread.CurrentThread.ManagedThreadId, i));
        Thread.Sleep(1000);
    }
}

static void Main(string[] args)
{
    int item = 0;

    Task.Factory.StartNew(() =>
    {
        while (true)
        {
            coll.Add(string.Format("Item {0}", item++));
            Thread.Sleep(500);
        }
    });

    for (int i = 0; i < 2; i++)
    {
        Task.Factory.StartNew(() => Consume());
    }

    while (true) ;
}

项目在两个不同线程上运行的两个消费者之间以交错方式处理,例如

Thread 4 Consuming: Item 0
Thread 5 Consuming: Item 1
Thread 4 Consuming: Item 2
Thread 5 Consuming: Item 3
Thread 4 Consuming: Item 4

【问题讨论】:

  • 当使用GetConsumingEnumerable时,生产者调用CompleteAdding()以便消费者知道何时停止是很重要的。

标签: c# .net c#-4.0


【解决方案1】:

多个消费者可以同时调用TakeTryTake - 每个项目只会被一个消费者消费。

但是,我相信GetConsumingEnumerable 也会做你想做的事。我相信如果每个调用者都调用它,每个调用者都会得到一个单独的消费枚举,这将再次确保每个项目只被消费一次。我不确定当队列变空时会发生什么 - 我不知道 MoveNext() 然后是阻塞还是返回 false。

虽然我并没有真正关注你的第二个问题......

【讨论】:

  • 抱歉 - 在第二次阅读时似乎有点模糊。所以,问题是我希望能够确定一个项目是否在队列中进行处理(很简单,因为我可以写一个 linq 查询来检查这个)所以我不会将重复的项目添加到队列中(并防止不必要的重复处理)。此队列是通过命名管道输入 PDF 编写器的输入,该管道将 PDF 写入共享位置。
  • 现在,如果请求的项目已经在队列中,(比如我写的 HttpHandler),我希望 HttpHandler 上的调用请求阻塞,直到该项目被处理,所以我可以保证任务已经完成,并且 PDF 文件存在于磁盘上,然后再提供它。希望上下文有所帮助!
  • @pkiddie:你不也想知道该项目是否已经被处理了吗?
  • 是的,所以我要做的是在将代表 PDF 的项目添加到队列之前检查磁盘上是否存在 PDF,如果存在就提供它
  • @pkiddie:我明白了。您的处理部分可以自动尝试创建文件吗?这样,同一个项目是否多次在队列中都无关紧要 - 只有一件事会尝试处理它,因为之后处理器可能会说“它已经完成或正在进行中”。
【解决方案2】:

GetConsumingEnumerable 实际上可以安全地同时从多个消费者调用;仅当集合被标记为完成时,可枚举才完成。每个项目只消耗一次。

GetConsumingEnumerableessentially equivalent to

while (!IsCompleted)
{
  if (TryTake(out var item, Timeout.Infinite))
    yield return item;
}

加上一些取消/清理逻辑。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多