【发布时间】:2020-06-08 00:57:42
【问题描述】:
我有一个异步任务流,它是通过将异步 lambda 应用于项目流而生成的:
IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
await Task.Delay(100);
return x.ToString();
})
上面的方法AsyncEnumerable.Range和Select是从System.Linq.Async包中提供的。
我想要的结果是一个结果流,表示为IAsyncEnumerable<string>。结果必须按照与原始任务相同的顺序进行流式传输。此外,必须限制流的枚举,因此在任何给定时间都不能超过指定数量的任务处于活动状态。
我想要IAsyncEnumerable<Task<T>> 类型上的扩展方法形式的解决方案,这样我就可以多次链接它并形成一个处理管道,功能与TPL Dataflow 管道相似,但表达流畅。以下是理想扩展方法的签名:
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel);
同时接受 CancellationToken 作为参数将是一个不错的功能。
更新:为了完整起见,我提供了一个通过链接两次AwaitResults 方法形成的流畅处理管道的示例。此管道以 PLINQ 块开始,只是为了证明混合 PLINQ 和 Linq.Async 是可能的。
int[] results = await Partitioner
.Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Thread.Sleep(100); // Simulate some CPU-bound operation
return x;
})
.ToAsyncEnumerable()
.Select(async x =>
{
await Task.Delay(300); // Simulate some I/O operation
return x;
})
.AwaitResults(concurrencyLevel: 5)
.Select(x => Task.Run(() =>
{
Thread.Sleep(100); // Simulate another CPU-bound operation
return x;
}))
.AwaitResults(concurrencyLevel: 2)
.ToArrayAsync();
Console.WriteLine($"Results: {String.Join(", ", results)}");
预期输出:
结果:1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20
注意:回想起来,AwaitResults 方法可能应该命名为Merge,而concurrencyLevel 参数应该命名为maxConcurrent,因为它的功能类似于Merge 运算符存在于Rx 库中。 System.Interactive.Async 包确实包含一个名为 Merge 的运算符,它产生 IAsyncEnumerable<T>s,但它的重载都没有在 IAsyncEnumerable<Task<T>> 源上运行。它在IEnumerable<IAsyncEnumerable<TSource>> 和IAsyncEnumerable<IAsyncEnumerable<TSource>> 源上运行。还可以添加参数bufferCapacity,以便显式控制等待/合并操作所需的缓冲区大小。
【问题讨论】:
-
你之前没问过这个吗?处理消息的不是
IAsyncEnumerable,而是您用来阅读和处理它们的任何东西。解决方案是 notIAsyncEnumerable<Task<T>- 这根本不会异步给你项目。您已经可以根据您的意思“节流”。每 N 个项目或秒只处理一个项目?批量转发? -
DataFlow 是一种处理流的方式 - 只需设置
BoundedCapacity=1,您就可以得到有序处理、批处理、开箱即用的可配置 DOP。渠道是另一个。await foreach是另一个。如果你想要节流,你可以创建一个异步迭代器,它从源流中读取 T 个项目,并每 n 个项目发出一个T[]。或第 N 项。您可以使用 System.Linq.Async 来简化此操作。 -
简而言之,问题是什么?即使对于
expressed fluently,你也可以编写一组扩展方法来做你想做的事,假设你决定what那是 -
BTW
no more than a specified number of tasks are active at any given time由 DOP 控制,而不是节流。 -
@PanagiotisKanavos I have asked before 用于名为
WhenEach的方法,其签名为:public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)。这不是可限制的,也不是可链接的。现在我想要一个可以像这样链接的方法:.Select().AwaitResults().Select().AwaitResults()...,每个处理块具有不同的并发级别。限制不是基于时间的。例如,concurrencyLevel = 5在任何时候都应该有最多五个任务处于活动状态。
标签: c# async-await task-parallel-library iasyncenumerable