【问题标题】:How to query two IAsyncEnumerables asynchronously如何异步查询两个 IAsyncEnumerables
【发布时间】:2021-02-11 10:15:45
【问题描述】:

我有两种方法连接到Foos 的两个不同来源,它们返回两个IAsyncEnumerable<Foo>。我需要从两个来源获取所有Foos,然后才能处理它们。

问题:我想同时查询两个源(异步),即。在开始枚举Source2之前不等待Source1完成枚举。据我了解,这就是下面的方法SequentialSourcesQuery 示例中发生的情况,对吗?

对于常规任务,我会启动第一个任务,然后是第二个任务,然后调用await Task.WhenAll。但是我对如何处理IAsyncEnumerable有点困惑。

public class FoosAsync
{
    public async IAsyncEnumerable<Foo> Source1() { }

    public async IAsyncEnumerable<Foo> Source2() { }

    public async Task<List<Foo>> SequentialSourcesQuery()
    {
        List<Foo> foos = new List<Foo>();

        await foreach (Foo foo1 in Source1())
        {
            foos.Add(foo1);
        }

        await foreach (Foo foo2 in Source2())
        { //doesn't start until Source1 completed the enumeration? 
            foos.Add(foo2);
        }

        return foos;
    }
}

【问题讨论】:

  • 一种解决方法是简单地启动两个新任务,然后在这些任务中运行循环。
  • 另外,从多个线程添加到列表时要小心。如讨论here
  • @smoksnes 好的,谢谢,我会看看。实际上这两个列表是不同的,我只是过度简化了这个例子。

标签: c# async-await iasyncenumerable


【解决方案1】:

您可以利用库System.Linq.AsyncSystem.Interactive.Async(由属于.NET 基金会的RxTeam 拥有)。它们包含 MergeToListAsync 等运算符,可以轻松解决您的问题。

// Merges elements from all of the specified async-enumerable sequences
// into a single async-enumerable sequence.
public static IAsyncEnumerable<TSource> Merge<TSource>(
    params IAsyncEnumerable<TSource>[] sources);

// Creates a list from an async-enumerable sequence.
public static ValueTask<List<TSource>> ToListAsync<TSource>(
    this IAsyncEnumerable<TSource> source,
    CancellationToken cancellationToken = default);

把所有东西放在一起:

public Task<List<Foo>> SequentialSourcesQuery()
{
    return AsyncEnumerableEx.Merge(Source1(), Source2()).ToListAsync().AsTask();
}

意识到这些库的重点是提供一组丰富的功能,而不是性能或效率。因此,如果一流的性能对您的用例很重要,niki.kante 的 solution 很可能会胜过上述基于运算符的方法。

【讨论】:

  • Rx 团队在这一点上实际上是 Microsoft 的一部分(哎呀,主要贡献者之一是 .NET 基金会的领导者!)这意味着如果您向他们提出性能或效率问题,这些很可能会得到解决。此外,Rx 存储库经过了令人难以置信的良好测试 - 如果您正在寻找可靠的解决方案,那么这就是正确的解决方案。
  • @IanKemp 总的来说,我对 RxTeam 拥有的库的性能感到满意。我的意思是,我期望实际应用程序具有足够好的性能特征(无论如何,它们通常会在其中包含各种低效率)。我不期望性能非常糟糕,但我也不期望有针对性和精心制作的手工实现的原始性能。您可以查看this 问题以了解我的意思。
【解决方案2】:

如果您有两个IAsyncEnumerable&lt;T&gt; 作为源并且不关心传入数据的顺序,则可以使用如下方法来交错数据。

public static class AsyncEnumerableExt
{
    public static async IAsyncEnumerable<T> Interleave<T>(this IAsyncEnumerable<T> first, IAsyncEnumerable<T> second)
    {
        var enum1 = first.GetAsyncEnumerator();
        var enum2 = second.GetAsyncEnumerator();

        var nextWait1 = enum1.MoveNextAsync().AsTask();
        var nextWait2 = enum2.MoveNextAsync().AsTask();

        do
        {
            var task = await Task.WhenAny(nextWait1, nextWait2).ConfigureAwait(false);

            if (task == nextWait1)
            {
                yield return enum1.Current;

                nextWait1 = !await task.ConfigureAwait(false) ? null : enum1.MoveNextAsync().AsTask();
            }
            else if (task == nextWait2)
            {
                yield return enum2.Current;

                nextWait2 = !await task.ConfigureAwait(false) ? null : enum2.MoveNextAsync().AsTask();
            }
        } while (nextWait1 != null && nextWait2 != null);

        while (nextWait1 != null)
        {
            if (!await nextWait1.ConfigureAwait(false))
            {
                nextWait1 = null;
            }
            else
            {
                yield return enum1.Current;
                nextWait1 = enum1.MoveNextAsync().AsTask();
            }
        }

        while (nextWait2 != null)
        {
            if (!await nextWait2.ConfigureAwait(false))
            {
                nextWait2 = null;
            }
            else
            {
                yield return enum2.Current;
                nextWait2 = enum2.MoveNextAsync().AsTask();
            }
        }
    }
}

然后您可以使用一个await foreach 使用数据并将数据存储在列表中。

【讨论】:

  • 哇,这真的很有趣,而且相当复杂!我永远不会自己管理这个。与将 IAsyncEnumerable 包装成两个任务的其他答案的主要区别在于,您可以在数据进入时对其进行操作,对吗?
  • @XavierAM 不会因为同步不当而随机抛出异常。
  • 这个实现与System.Interactive.Asyncis implemented中的Merge运算符非常相似。
  • @XavierAM 请选择 Theodor 的解决方案而不是我的解决方案。
【解决方案3】:

您可以编写另一个返回 Task 的异步本地方法。

Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
        List<Foo> foos = new List<Foo>();
        await foreach (Foo foo1 in values)
        {
            foos.Add(foo1);
        }        
        return foos;
};

然后这样称呼它:

Task<List<Foo>> task1 = readValues(Source1());
Task<List<Foo>> task2 = readValues(Source2());

await Task.WhenAll(task1, task2);

整个代码是:

public class FoosAsync
{
    public async IAsyncEnumerable<Foo> Source1() { }

    public async IAsyncEnumerable<Foo> Source2() { }

    public async Task<List<Foo>> SequentialSourcesQuery()
    {
        var asyncEnumerator = Source1().GetAsyncEnumerator();
        Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
            List<Foo> foos2 = new List<Foo>();
            await foreach (Foo foo in values)
            {
                foos2.Add(foo);
            }        
            return foos2;
        };
        
        Task<List<Foo>> task1 = readValues(Source1());
        Task<List<Foo>> task2 = readValues(Source2());
        
        await Task.WhenAll(task1, task2);
        
        List<Foo> foos = new List<Foo>(task1.Result.Count + task2.Result.Count);
        foos.AddRange(task1.Result);
        foos.AddRange(task2.Result);

        return foos;
    }
}

【讨论】:

  • @XavierAM ConcurrentBag&lt;T&gt; 是一个 very specialized 集合。除非您正在处理混合的生产者-消费者场景,否则您应该更喜欢 ConcurrentQueue&lt;T&gt;
  • @Servy 感谢您的批评。解决方案是针对 OP 的基本问题。下次我会尝试做出更完整的答案。我更新了修复并发的答案,并按照 pinkfloydx33 的建议添加了本地方法。
  • @TheodorZoulias 好的,谢谢,我不知道 ConcurrentBag 和 ConcurrentQueue 之间的细微差别。
  • @XavierAM 现在你是知道这个小秘密的小圈子里的一员。 ?
  • 为什么似乎没有使用的无关asyncEnumerator
猜你喜欢
  • 2014-12-05
  • 2017-07-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-20
  • 2014-03-28
  • 2018-02-12
  • 1970-01-01
相关资源
最近更新 更多