【问题标题】:How to await all results from an IAsyncEnumerable<>?如何等待来自 IAsyncEnumerable<> 的所有结果?
【发布时间】:2020-03-13 20:34:23
【问题描述】:

我正在修改 C# 8.0 中的新 IAsyncEnumerable&lt;T&gt; 内容。假设我在某处想使用某种方法:

public IAsyncEnumerable<T> SomeBlackBoxFunctionAsync<T>(...) { ... }

我知道我可以将它与await foreach... 语法一起使用。但是,假设我的消费者需要从该函数获得所有结果,然后才能继续。在继续之前等待所有结果的最佳语法是什么?换句话说,我希望能够执行以下操作:

// but that extension - AllResultsAsync() - doesn't exist :-/
List<T> myList = await SomeBlackBoxFunctionAsync<T>().AllResultsAsync(); 

这样做的正确方法是什么?

【问题讨论】:

  • Task.WaitAll() ??
  • @AzharKhorasany 该语法是什么样的?我已经修改了Task.WhenAll(),但我无法让它工作。
  • await foreach (var item in SomeBlackBoxFunctionAsync&lt;T&gt;()) myList.Add(item);
  • 从您的方法中返回任务,然后等待所有。
  • 为什么要在处理结果之前消耗整个流?根据定义,异步流可​​能永远不会结束

标签: c# c#-8.0 iasyncenumerable


【解决方案1】:

首先警告:根据定义,异步流可​​能永远不会结束并一直产生结果,直到应用程序终止。这已经在例如 SignalR 或 gRPC 中使用。轮询循环也以这种方式工作。

在异步流上使用 ToListAsync 可能会产生意想不到的后果。


System.Linq.Async 包已经提供了这样的运算符。

通过ToListAsync 可以使用整个流。代码*看似简单,但隐藏了一些有趣的问题:

public static ValueTask<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
{
    if (source == null)
        throw Error.ArgumentNull(nameof(source));

    if (source is IAsyncIListProvider<TSource> listProvider)
        return listProvider.ToListAsync(cancellationToken);

    return Core(source, cancellationToken);

    static async ValueTask<List<TSource>> Core(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
    {
        var list = new List<TSource>();

        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            list.Add(item);
        }

        return list;
    }
}

首先,它返回一个ValueTask。其次,它确保观察到取消并使用ConfigureAwait(false),以防止死锁。最后,如果源已经提供了自己的ToListAsync 实现,则运营商会遵照执行。

【讨论】:

  • 注意:IAsyncIListProvider 接口包含在System.Linq.Async 包中(以及其他两个不起眼的接口),并且没有由任何公开可见的类实现。
【解决方案2】:

作为一个选项,您可以使用ToArrayAsync 扩展方法,在System.Linq.Async 包中定义

public static ValueTask<TSource[]> ToArrayAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)

根据定义,它扩展了IAsyncEnumerable 接口

【讨论】:

    【解决方案3】:

    根据@DmitryBychenko 的评论,我写了一个扩展来做我想要的:

        public static async Task<ICollection<T>> AllResultsAsync<T>(this IAsyncEnumerable<T> asyncEnumerable)
        {
            if (null == asyncEnumerable)
                throw new ArgumentNullException(nameof(asyncEnumerable));  
    
            var list = new List<T>();
            await foreach (var t in asyncEnumerable)
            {
                list.Add(t);
            }
    
            return list;
        }
    

    我只是有点惊讶这不是 C# 8.0 原生提供的......这似乎是一个非常明显的需求。

    【讨论】:

    • 看看这个 GitHub thread,基本上有一个基于社区的包和repo,带有对 AsyncEnumerable 的 linq 支持。或者使用 Rx 提供的System.Linq.Async
    • 请为asyncEnumerable 添加验证(因为AllResultsAsyncpublic 方法) - 它不能是null 并且有我的+1跨度>
    • 作为 System.Linq.Async 包的一部分提供。
    • @PanagiotisKanavos 很高兴听到它! System.Linq.Async 中调用的方法是什么?如果它在那里,那应该是我问题的正确答案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-06-08
    • 1970-01-01
    • 1970-01-01
    • 2021-01-21
    • 2014-06-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多