【发布时间】:2021-09-07 02:52:29
【问题描述】:
对于多线程应用程序,我想 await 直到 BlockingCollection 完成并为空(IsCompleted = true)。我实现了以下内容,这似乎有效。
由于它是多线程,我什至不相信自己的影子。这会是一个健壮的实现吗?
public class BlockingCollectionEx<T> : BlockingCollection<T>
{
public Task WaitCompleted => completedManualResetEvent.Task;
private readonly TaskCompletionSource completedManualResetEvent = new();
public new void CompleteAdding()
{
base.CompleteAdding();
lock (completedManualResetEvent)
{
if (base.Count == 0 && !completedManualResetEvent.Task.IsCompleted)
completedManualResetEvent.SetResult();
}
}
public new IEnumerable<T> GetConsumingEnumerable()
{
foreach (var item in base.GetConsumingEnumerable())
yield return item;
lock (completedManualResetEvent) //if GetConsumingEnumerable is used by multiple threads, the 2nd one would throw an InvalidOperationException
{
if (!completedManualResetEvent.Task.IsCompleted)
completedManualResetEvent.SetResult();
}
}
public new IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) => throw new NotImplementedException();
public new T Take() => throw new NotImplementedException();
public new T Take(CancellationToken cancellationToken) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout) => throw new NotImplementedException();
}
用法:
var x = new BlockingCollectionEx<int> { 1, 2, 3 };
x.CompleteAdding();
Task.Run(() =>
{
foreach (var item in x.GetConsumingEnumerable())
// do stuff in Task 1
});
Task.Run(() =>
{
foreach (var item in x.GetConsumingEnumerable())
// do stuff in Task 2
});
await x.WaitCompleted;
Debug.Assert(x.IsCompleted);
// do stuff since the collection is emtpy
【问题讨论】:
-
它看起来应该可以工作,但是有一些代码味道让我想知道是否有更好的方法来实现您的目标。例如,扩展集合类通常会导致悲伤。既然
Task.Run返回一个任务,能不能不使用常规的BlockingCollection和await Task.Run(...);? -
如果您对使用异步 API 的消耗性集合感兴趣,您可能会发现这个问题很有趣:Is there anything like asynchronous BlockingCollection<T>?
-
为什么使用线程安全集合只是为了在单个线程中使用它?
-
@Red 示例用法已更新。我对 GetConsumingEnumerable() 的阻塞方面感兴趣/利用
-
所以您基本上是在尝试在多个线程中处理列表中的元素,您希望每个线程(我假设)删除当前正在处理的元素,或者至少其他线程跳过它。一旦所有线程发现列表中没有其他元素,它们就会完成任务。如果是这种情况,您真的确定需要所有这些基础架构来实现它吗?
标签: c# asynchronous async-await blockingcollection concurrent-collections