【问题标题】:How to await multiple IAsyncEnumerable如何等待多个 IAsyncEnumerable
【发布时间】:2019-12-27 11:55:00
【问题描述】:

我们有这样的代码:

var intList = new List<int>{1,2,3};
var asyncEnumerables = intList.Select(Foo);

private async IAsyncEnumerable<int> Foo(int a)
{
  while (true)
  {
    await Task.Delay(5000);
    yield return a;
  } 
}

我需要为每个asyncEnumerable 的条目启动await foreach。每个循环迭代都应该相互等待,当每次迭代完成时,我需要收集每个迭代的数据并通过另一种方法处理。

我可以通过 TPL 以某种方式实现吗?否则,你不能给我一些想法吗?

【问题讨论】:

  • 实际上想做什么?您可以使用 System.Linq.Async 中的运算符(如 Concat 或 Zip)将多个异步流合并为一个,并使用例如 Aggregate 或 Sum 来处理数据
  • 您可以修改Zip 的代码,例如处理两个以上的异步流,或者多次使用 Zip
  • 所以预期的输出是IAsyncEnumerable&lt;int[]&gt;,每 15,000 毫秒产生数组 [1, 2, 3]?
  • @TheodorZoulias 完全正确。
  • @PanagiotisKanavos 谢谢!我试试看。

标签: c# .net .net-core c#-8.0 iasyncenumerable


【解决方案1】:

对我有用的是this repo(81 行)中的Zip 函数

我就是这样用的

var intList = new List<int> { 1, 2, 3 };
var asyncEnumerables = intList.Select(RunAsyncIterations);
var enumerableToIterate = async_enumerable_dotnet.AsyncEnumerable.Zip(s => s, asyncEnumerables.ToArray());

await foreach (int[] enumerablesConcatenation in enumerableToIterate)
{
    Console.WriteLine(enumerablesConcatenation.Sum()); //Sum returns 6
    await Task.Delay(2000);
}

static async IAsyncEnumerable<int> RunAsyncIterations(int i)
{
    while (true)
        yield return i;
}

【讨论】:

    【解决方案2】:

    这是您可以使用的通用方法Zip,实现为iteratorcancellationTokenEnumeratorCancellation 属性修饰,因此生成的 IAsyncEnumerableWithCancellation 友好的。

    using System.Runtime.CompilerServices;
    
    public static async IAsyncEnumerable<TSource[]> Zip<TSource>(
        IEnumerable<IAsyncEnumerable<TSource>> sources,
        [EnumeratorCancellation]CancellationToken cancellationToken = default)
    {
        var enumerators = sources
            .Select(x => x.GetAsyncEnumerator(cancellationToken))
            .ToArray();
        try
        {
            while (true)
            {
                var array = new TSource[enumerators.Length];
                for (int i = 0; i < enumerators.Length; i++)
                {
                    if (!await enumerators[i].MoveNextAsync()) yield break;
                    array[i] = enumerators[i].Current;
                }
                yield return array;
            }
        }
        finally
        {
            foreach (var enumerator in enumerators)
            {
                await enumerator.DisposeAsync();
            }
        }
    }
    

    使用示例:

    await foreach (int[] result in Zip(asyncEnumerables))
    {
        Console.WriteLine($"Result: {String.Join(", ", result)}");
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-03-13
      • 2020-12-02
      • 2018-04-23
      • 2020-08-31
      • 2018-02-24
      • 2019-05-18
      • 1970-01-01
      • 2020-06-08
      相关资源
      最近更新 更多