【问题标题】:How to await the results of an IAsyncEnumerable<Task<T>>, with a specific level of concurrency如何等待具有特定并发级别的 IAsyncEnumerable<Task<T>> 的结果
【发布时间】:2020-06-08 00:57:42
【问题描述】:

我有一个异步任务流,它是通过将异步 lambda 应用于项目流而生成的:

IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
    await Task.Delay(100);
    return x.ToString();
})

上面的方法AsyncEnumerable.RangeSelect是从System.Linq.Async包中提供的。

我想要的结果是一个结果流,表示为IAsyncEnumerable&lt;string&gt;。结果必须按照与原始任务相同的顺序进行流式传输。此外,必须限制流的枚举,因此在任何给定时间都不能超过指定数量的任务处于活动状态。

我想要IAsyncEnumerable&lt;Task&lt;T&gt;&gt; 类型上的扩展方法形式的解决方案,这样我就可以多次链接它并形成一个处理管道,功能与TPL Dataflow 管道相似,但表达流畅。以下是理想扩展方法的签名:

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel);

同时接受 CancellationToken 作为参数将是一个不错的功能。


更新:为了完整起见,我提供了一个通过链接两次AwaitResults 方法形成的流畅处理管道的示例。此管道以 PLINQ 块开始,只是为了证明混合 PLINQ 和 Linq.Async 是可能的。

int[] results = await Partitioner
    .Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(2)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Thread.Sleep(100); // Simulate some CPU-bound operation
        return x;
    })
    .ToAsyncEnumerable()
    .Select(async x =>
    {
        await Task.Delay(300); // Simulate some I/O operation
        return x;
    })
    .AwaitResults(concurrencyLevel: 5)
    .Select(x => Task.Run(() =>
    {
        Thread.Sleep(100); // Simulate another CPU-bound operation
        return x;
    }))
    .AwaitResults(concurrencyLevel: 2)
    .ToArrayAsync();

Console.WriteLine($"Results: {String.Join(", ", results)}");

预期输出:

结果:1​​、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20


注意:回想起来,AwaitResults 方法可能应该命名为Merge,而concurrencyLevel 参数应该命名为maxConcurrent,因为它的功能类似于Merge 运算符存在于Rx 库中。 System.Interactive.Async 包确实包含一个名为 Merge 的运算符,它产生 IAsyncEnumerable&lt;T&gt;s,但它的重载都没有在 IAsyncEnumerable&lt;Task&lt;T&gt;&gt; 源上运行。它在IEnumerable&lt;IAsyncEnumerable&lt;TSource&gt;&gt;IAsyncEnumerable&lt;IAsyncEnumerable&lt;TSource&gt;&gt; 源上运行。还可以添加参数bufferCapacity,以便显式控制等待/合并操作所需的缓冲区大小。

【问题讨论】:

  • 你之前没问过这个吗?处理消息的不是IAsyncEnumerable,而是您用来阅读和处理它们的任何东西。解决方案是 not IAsyncEnumerable&lt;Task&lt;T&gt; - 这根本不会异步给你项目。您已经可以根据您的意思“节流”。每 N 个项目或秒只处理一个项目?批量转发?
  • DataFlow 一种处理流的方式 - 只需设置BoundedCapacity=1,您就可以得到有序处理、批处理、开箱即用的可配置 DOP。渠道是另一个。 await foreach 是另一个。如果你想要节流,你可以创建一个异步迭代器,它从源流中读取 T 个项目,并每 n 个项目发出一个 T[]。或第 N 项。您可以使用 System.Linq.Async 来简化此操作。
  • 简而言之,问题是什么?即使对于expressed fluently,你也可以编写一组扩展方法来做你想做的事,假设你决定what那是
  • BTW no more than a specified number of tasks are active at any given time 由 DOP 控制,而不是节流。
  • @PanagiotisKanavos I have asked before 用于名为 WhenEach 的方法,其签名为:public static async IAsyncEnumerable&lt;TResult&gt; WhenEach&lt;TResult&gt;(Task&lt;TResult&gt;[] tasks)。这不是可限制的,也不是可链接的。现在我想要一个可以像这样链接的方法:.Select().AwaitResults().Select().AwaitResults()...,每个处理块具有不同的并发级别。限制不是基于时间的。例如,concurrencyLevel = 5 在任何时候都应该有最多五个任务处于活动状态。

标签: c# async-await task-parallel-library iasyncenumerable


【解决方案1】:

这是我对AwaitResults 方法的实现。它基于用于控制并发级别的SemaphoreSlim,以及用作异步队列的Channel&lt;Task&lt;TResult&gt;&gt;。源IAsyncEnumerable&lt;Task&lt;TResult&gt;&gt; 的枚举发生在一个即发即弃的任务(馈线)中,它将热任务推送到通道。它还为每个任务附加一个延续,释放信号量。

该方法的最后一部分是 yielding 循环,其中任务从通道中一个一个地出队,然后依次等待。这样,结果的生成顺序与源流中的任务顺序相同。

此实现要求每个任务等待两次,这意味着它不能用于IAsyncEnumerable&lt;ValueTask&lt;TResult&gt;&gt; 类型的源,因为ValueTask can only be awaited once

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel = 1,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (concurrencyLevel < 1)
        throw new ArgumentOutOfRangeException(nameof(concurrencyLevel));

    var semaphore = new SemaphoreSlim(concurrencyLevel - 1);
    var channelCapacity = Math.Max(1000, concurrencyLevel * 10);
    var tasksChannel = Channel.CreateBounded<Task<TResult>>(channelCapacity);
    var completionCts = CancellationTokenSource.CreateLinkedTokenSource(
        cancellationToken);

    // Feeder task: fire and forget
    _ = Task.Run(async () =>
    {
        try
        {
            await foreach (var task in source
                .WithCancellation(completionCts.Token).ConfigureAwait(false))
            {
                HandleTaskCompletion(task);
                await tasksChannel.Writer.WriteAsync(task, completionCts.Token)
                    .ConfigureAwait(false);
                await semaphore.WaitAsync(completionCts.Token)
                    .ConfigureAwait(false); // Acquire before MoveNextAsync
            }
            tasksChannel.Writer.Complete();
        }
        catch (Exception ex)
        {
            tasksChannel.Writer.Complete(ex);
        }
    });

    async void HandleTaskCompletion(Task task)
    {
        try
        {
            await task.ConfigureAwait(false);
        }
        catch
        {
            // Ignore exceptions here
        }
        finally
        {
            semaphore.Release();
        }
    }

    try
    {
        while (await tasksChannel.Reader.WaitToReadAsync(cancellationToken)
            .ConfigureAwait(false))
        {
            while (tasksChannel.Reader.TryRead(out var task))
            {
                yield return await task.ConfigureAwait(false);
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
    }
    finally // Happens when the caller disposes the output enumerator
    {
        completionCts.Cancel();
    }
}

一个重要的细节是最终产生循环周围的 try-finally 块。这对于方法调用者过早放弃对结果流的枚举的情况是必需的。在这种情况下,源流的枚举也应该终止,并且该终止使用CancellationTokenSource 向后传播。没有它,feeder 任务永远不会完成,对象永远不会被垃圾收集,内存也会泄漏。

注意:取消cancellationToken 可能不会立即取消整个操作。为了获得最大的响应速度,应该使用相同的cancellationToken 来取消单个任务。

【讨论】:

  • 你能解释一下为什么馈线任务永远不会完成吗?一旦IAsyncEnumerable source 用完了要生成的项目,它会不会完成?
  • @CodeMonkey 当然。 Channel 已初始化为 1000 的容量,以防止缓冲过多任务的可能性。这可能发生在一个非常慢的任务后面跟着无数个闪电般的快速任务(结果是有序的,所以快速任务的结果必须在产生慢速任务的结果之后产生),或者如果结果IAsyncEnumerable 消耗它的速度非常慢。所以如果source有超过1000个task,而consumer提前放弃枚举,feeder就会卡住。因此需要 try/finally 块。
  • 谢谢,有道理,我错过了 EnumeratorCancelation 属性。
  • 回想起来,这个实现有一个重大缺陷。如果Task 失败,则生成的IAsyncEnumerable&lt;TResult&gt; 可能会在下一个Task 仍在运行时传播异常,从而导致泄漏的即发即弃任务。
猜你喜欢
  • 2020-03-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多