【问题标题】:await completion of BlockingCollection [closed]等待 BlockingCollection 完成 [关闭]
【发布时间】:2021-09-07 02:52:29
【问题描述】:

对于多线程应用程序,我想 await 直到 BlockingCollection 完成并为空(IsCompleted = true)。我实现了以下内容,这似乎有效。

由于它是多线程,我什至不相信自己的影子。这会是一个健壮的实现吗?

public class BlockingCollectionEx<T> : BlockingCollection<T>
{
    public Task WaitCompleted => completedManualResetEvent.Task;
    private readonly TaskCompletionSource completedManualResetEvent = new();

    public new void CompleteAdding()
    {
        base.CompleteAdding();

        lock (completedManualResetEvent)
        {
            if (base.Count == 0 && !completedManualResetEvent.Task.IsCompleted)
                completedManualResetEvent.SetResult();
        }
    }

    public new IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var item in base.GetConsumingEnumerable())
            yield return item;

        lock (completedManualResetEvent) //if GetConsumingEnumerable is used by multiple threads, the 2nd one would throw an InvalidOperationException 
        {
            if (!completedManualResetEvent.Task.IsCompleted)
                completedManualResetEvent.SetResult();
        }
    }
    public new IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) => throw new NotImplementedException();

    public new T Take() => throw new NotImplementedException();
    public new T Take(CancellationToken cancellationToken) => throw new NotImplementedException();

    public new bool TryTake([MaybeNullWhen(false)] out T item) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout) => throw new NotImplementedException();
}

用法:

var x = new BlockingCollectionEx<int> { 1, 2, 3 };
x.CompleteAdding();

Task.Run(() =>
{
    foreach (var item in x.GetConsumingEnumerable())
        // do stuff in Task 1
});
Task.Run(() =>
{
    foreach (var item in x.GetConsumingEnumerable())
        // do stuff in Task 2
});

await x.WaitCompleted;
Debug.Assert(x.IsCompleted);
// do stuff since the collection is emtpy

【问题讨论】:

  • 它看起来应该可以工作,但是有一些代码味道让我想知道是否有更好的方法来实现您的目标。例如,扩展集合类通常会导致悲伤。既然Task.Run返回一个任务,能不能不使用常规的BlockingCollection和await Task.Run(...);
  • 如果您对使用异步 API 的消耗性集合感兴趣,您可能会发现这个问题很有趣:Is there anything like asynchronous BlockingCollection<T>?
  • 为什么使用线程安全集合只是为了在单个线程中使用它?
  • @Red 示例用法已更新。我对 GetConsumingEnumerable() 的阻塞方面感兴趣/利用
  • 所以您基本上是在尝试在多个线程中处理列表中的元素,您希望每个线程(我假设)删除当前正在处理的元素,或者至少其他线程跳过它。一旦所有线程发现列表中没有其他元素,它们就会完成任务。如果是这种情况,您真的确定需要所有这些基础架构来实现它吗?

标签: c# asynchronous async-await blockingcollection concurrent-collections


【解决方案1】:

您的实现对于一般用途来说并不健壮,但对于遵守以下合同的应用程序来说可能已经足够了:

该集合必须仅由一位消费者使用,该消费者仅使用GetConsumingEnumerable 方法来使用它。

  1. 如果没有消费者,集合为空,调用CompleteAdding方法,WaitCompleted任务永远不会完成。
  2. 如果有两个或更多消费者,则枚举将失败,除一个消费者之外的所有消费者都将返回 InvalidOperationException
  3. 如果有一个消费者,但使用TakeTryTake 方法消费集合,WaitCompleted 任务将永远无法完成。

在不了解您的具体用例的情况下,我无法说您是否有正当理由请求此功能。但总的来说,等待BlockingCollection&lt;T&gt; 变为空并完成的确切时刻通常并不重要。重要的是所有消费项目的处理完成的确切时刻,这发生在收集完成之后


注意:此答案针对此问题的Revision 1 版本。

【讨论】:

  • 感谢 Theodor,我根据您的 cmets 修改了 OP 中的实现(考虑到 1 和 2 并明确将 3 标记为未实现。Wdyt?
  • @WouterVanRanst 关于Revision 2 版本的问题,在不确定的情况下,我怀疑那里潜伏着竞争条件。就个人而言,假设我需要这个功能(这不太可能发生),我不会相信生产环境中的这个实现。 Count 之类的属性并非真正用于控制执行流程。
  • @WouterVanRanst 也继承自 BlockingCollection&lt;T&gt; 可能是麻烦的来源。在具有BlockingCollection&lt;T&gt; 参数的方法中传递BlockingCollectionEx&lt;T&gt; 类的实例,就足以让所有的地狱都崩溃了。此方法将使用 base.GetConsumingEnumerable 实现而不是您的类实现。多态性不适用于new 成员。它仅适用于override 成员。
  • 最后一个论点让我信服了,谢谢。
猜你喜欢
  • 2018-04-12
  • 2021-07-17
  • 2015-02-15
  • 2020-04-07
  • 2016-06-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多