【问题标题】:How to enumerate an IAsyncEnumerable<T> and invoke an async action for each element, allowing concurrency for each iteration/action pair?如何枚举 IAsyncEnumerable<T> 并为每个元素调用异步操作,允许每个迭代/操作对并发?
【发布时间】:2021-02-14 04:02:15
【问题描述】:

我有一个 IAsyncEnumerable&lt;string&gt; 流,其中包含从 Web 下载的数据,我想将每条数据异步保存在 SQL 数据库中。所以我使用了来自System.Linq.Async 库的ForEachAwaitAsync 扩展方法。我的问题是下载和保存每条数据是按顺序进行的,而我希望它同时发生。

澄清一下,我不想同时下载多个数据,也不想同时保存多个数据。我想要的是,当我在数据库中保存一条数据时,下一条数据应该同时从网络上下载。

以下是我当前解决方案的最小(人为)示例。下载五个项目,然后将其保存在数据库中。下载每个项目需要 1 秒,保存则需要 1 秒:

async IAsyncEnumerable<string> GetDataFromWeb()
{
    foreach (var item in Enumerable.Range(1, 5))
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Downloading #{item}");
        await Task.Delay(1000); // Simulate an I/O-bound operation
        yield return item.ToString();
    }
}

var stopwatch = Stopwatch.StartNew();
await GetDataFromWeb().ForEachAwaitAsync(async item =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Saving #{item}");
    await Task.Delay(1000); // Simulate an I/O-bound operation
});
Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");

代码正在运行,但不是我想要的方式。总持续时间约为 10 秒,而不是理想的约 6 秒。

实际不受欢迎的输出:

04:55:50.526 > Downloading #1
04:55:51.595 > Saving #1
04:55:52.598 > Downloading #2
04:55:53.609 > Saving #2
04:55:54.615 > Downloading #3
04:55:55.616 > Saving #3
04:55:56.617 > Downloading #4
04:55:57.619 > Saving #4
04:55:58.621 > Downloading #5
04:55:59.622 > Saving #5
Duration: 10,115 msec

假设的理想输出:

04:55:50.000 > Downloading #1
04:55:51.000 > Saving #1
04:55:51.000 > Downloading #2
04:55:52.000 > Saving #2
04:55:52.000 > Downloading #3
04:55:53.000 > Saving #3
04:55:53.000 > Downloading #4
04:55:54.000 > Saving #4
04:55:54.000 > Downloading #5
04:55:55.000 > Saving #5
Duration: 6,000 msec

我正在考虑实现一个名为ForEachConcurrentAsync 的自定义扩展方法,它与上述ForEachAwaitAsync 方法具有相同的签名,但其行为允许同时对项目进行枚举和操作。下面是这个方法的一个存根:

/// <summary>
/// Invokes and awaits an asynchronous action on each element in the source sequence.
/// Each action is awaited concurrently with fetching the sequence's next element.
/// </summary>
public static Task ForEachConcurrentAsync<T>(
    this IAsyncEnumerable<T> source,
    Func<T, Task> action,
    CancellationToken cancellationToken = default)
{
    // What to do?
}

如何实现这个功能?

附加要求:

  1. 在取消或失败的情况下泄漏正在运行的任务是不可接受的。当方法完成时,所有启动的任务都应该完成。
  2. 在枚举和操作都失败的极端情况下,应该只传播两个异常中的一个,并且任何一个都可以。
  3. 该方法应该是真正异步的,并且不应阻塞当前线程(除非action 参数包含阻塞代码,但这是调用者的责任来防止)。

说明:

  1. 如果保存数据的时间比从网络下载数据的时间长,该方法应该继续提前下载更多项目。最多只能预先下载一份数据,而保存前一份数据。

  2. 带有网络数据的IAsyncEnumerable&lt;string&gt; 是这个问题的起点。我不想更改IAsyncEnumerable&lt;string&gt; 的生成器方法。我想对它的元素采取行动(通过将它们保存到数据库中),同时枚举可枚举的元素。

【问题讨论】:

  • 您需要从action 调用中收集任务并在最后执行Task.WhenAll
  • @juharr 理想情况下,我希望避免在枚举期间跟踪所有任务。 IAsyncEnumerable&lt;T&gt; 理论上可以发出无限的元素。
  • 如果你真的获得了无限的项目集合,那么代码将永远不会完成,但要处理它,你只需要一个缓冲区,一旦达到限制,你可以执行 WhenAny 然后删除完成的任务。因为您可以在第一次保存之前完成所有下载,因此除非您想触发并忘记它们,否则无法在不以某种方式跟踪任务的情况下继续迭代集合。
  • @juharr 是的,保持有限的任务缓冲区当然是可能的。我不知道这将如何帮助我实现我想要的行为。关于“在第一次保存之前完成所有下载”,这不是我想要的功能。最多只能下载一条数据,保存前一条。
  • 我理论上是说从集合中获取项目的时间可能远少于执行操作的时间,在这种情况下,我假设您希望按顺序而不是并行获取项目,但您希望这些操作不会延迟获取下一个项目。如果您说要等到第一个项目被保存后再下载第二个项目,那么您对需要多长时间的期望是完全错误的。

标签: c# concurrency async-await iasyncenumerable


【解决方案1】:

听起来你只需要跟踪前一个动作的任务并在下一个动作任务之前等待它。

public static async Task ForEachConcurrentAsync<T>(
    this IAsyncEnumerable<T> source,
    Func<T, Task> action,
    CancellationToken cancellationToken = default)
{
    Task previous = null;
    try
    {
        await source.ForEachAwaitAsync(async item =>
        {
            if(previous != null)
            {
                await previous;
            }

            previous = action(item);
        });
    }
    finally
    {
        if(previous != null)
        {
            await previous;
        }
    }
}

剩下的就是添加取消代码。

【讨论】:

  • 感谢 juharr 的回答。它很好地涵盖了问题的基本功能!我不能接受它,因为它不满足问题的第一个附加要求。如果source IAsyncEnumerable 失败,一个正在运行的任务可能会被抛在后面,以一种即发即弃的方式运行而未被观察到。
  • 我添加了一个 try finally 来处理等待上一个任务发生错误的情况。
  • 是的,现在它可以根据要求完美运行了!我希望它不依赖于 System.Linq.Async 库,也希望包含 CancellationToken 功能,但由于已经满足要求,我接受了答案。
【解决方案2】:

这是我的解决方案。
我必须将序列更改为数组才能访问下一个元素。
不确定它是否符合您填充数组的要求。

这个想法是在返回当前项目之前开始下载下一个项目。

    private static async Task Main(string[] args)
    {
        var stopwatch = Stopwatch.StartNew();
        await foreach (var item in GetDataFromWebAsync())
        {
            Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Saving #{item}");
            await Task.Delay(1000); // Simulate an I/O-bound operation

        }

        Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    private static async IAsyncEnumerable<string> GetDataFromWebAsync()
    {
        var items = Enumerable
            .Range(1, 5)
            .Select(x => x.ToString())
            .ToArray();

        Task<string> next = null;

        for (var i = 0; i < items.Length; i++)
        {
            var current = next is null 
                ? await DownloadItemAsync(items[i]) 
                : await next;

            var nextIndex = i + 1;
            next = StarNextDownloadAsync(items, nextIndex);
            
            yield return current;
        }
    }

    private static async Task<string> StarNextDownloadAsync(IReadOnlyList<string> items, int nextIndex)
    {
        return nextIndex < items.Count
            ? await DownloadItemAsync(items[nextIndex])
            : null;
    }

    private static async Task<string> DownloadItemAsync(string item)
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Downloading #{item}");
        await Task.Delay(1000);
        return item;
    }

控制台输出:

15:57:26.226 > Downloading #1
15:57:27.301 > Downloading #2
15:57:27.302 > Saving #1
15:57:28.306 > Downloading #3
15:57:28.307 > Saving #2
15:57:29.312 > Downloading #4
15:57:29.340 > Saving #3
15:57:30.344 > Downloading #5
15:57:30.347 > Saving #4
15:57:31.359 > Saving #5
Duration: 6 174 msec

【讨论】:

  • 感谢 Stefan 的回答。它似乎工作得很好。我不喜欢这个解决方案的是,逻辑在IAsyncEnumerable&lt;string&gt; 的迭代器方法中被拦截。我真的不想改变这种方法。我的最终目标是拥有一个通用的ForEachConcurrentAsync 方法,我可以用它来解决以IAsyncEnumerable&lt;T&gt; 为起点的各种问题。我已经编辑了这个问题并对此进行了澄清。无论如何我都赞成你的回答,因为它似乎解决了这个特殊问题。
【解决方案3】:

这里是一个比较简单的实现,不依赖System.Linq.Async包:

/// <summary>
/// Invokes and awaits an asynchronous action on each element in the source sequence.
/// Each action is awaited concurrently with fetching the sequence's next element.
/// </summary>
public static async Task ForEachConcurrentAsync<T>(
    this IAsyncEnumerable<T> source,
    Func<T, Task> action,
    CancellationToken cancellationToken = default)
{
    var enumerator = source.GetAsyncEnumerator(cancellationToken);
    await using (enumerator.ConfigureAwait(false))
    {
        if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) return;
        while (true)
        {
            Task task = action(enumerator.Current);
            bool moved;
            try { moved = await enumerator.MoveNextAsync().ConfigureAwait(false); }
            finally { await task.ConfigureAwait(false); }
            if (!moved) break;
        }
    }
}

为了简单起见,使用try/finally 块而不是使用Task.WhenAll 等待两个并发任务。缺点是如果两个并发操作都失败了,MoveNextAsync 的错误将不会被传播。

【讨论】:

    猜你喜欢
    • 2020-01-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-03
    • 1970-01-01
    • 2013-10-23
    • 2016-04-06
    • 1970-01-01
    相关资源
    最近更新 更多