【问题标题】:await thousands of Tasks等待数千个任务
【发布时间】:2016-10-19 07:02:05
【问题描述】:

我有一个转换一些数据的应用程序,通常有 1.000 - 30.000 个文件。

我需要做 3 个步骤:

  1. 复制一个文件(替换其中的一些文本)
  2. 使用 WebClient 发出 Web 请求以下载文件(我将复制的文件发送到 WebServer,WebServer 将文件转换为另一种格式)
  3. 获取下载的文件并更改部分内容

所以所有三个步骤都包括一些 I/O,我使用了 async/await 方法:

var tasks = files.Select(async (file) =>
{
    Item item = await createtempFile(file).ConfigureAwait(false);
    await convert(item).ConfigureAwait(false);
    await clean(item).ConfigureAwait(false);
}).ToList();

await Task.WhenAll(tasks).ConfigureAwait(false);

我不知道这是否是最佳做法,因为我创建了超过一千个任务。我考虑过拆分三个步骤,例如:

List<Item> items = new List<Item>();
var tasks = files.Select(async (file) =>
{
    Item item = await createtempFile(file, ext).ConfigureAwait(false);
    lock(items)
        items.Add(item);
}).ToList();

await Task.WhenAll(tasks).ConfigureAwait(false);

var tasks = items.Select(async (item) =>
{
    await convert(item, baseAddress, ext).ConfigureAwait(false);
}).ToList();

await Task.WhenAll(tasks).ConfigureAwait(false);

var tasks = items.Select(async (item) =>
{
    await clean(targetFile, item.Doctype, ext).ConfigureAwait(false);
}).ToList();

await Task.WhenAll(tasks).ConfigureAwait(false);

但这似乎并没有更好或更快,因为我创建了 3 倍于数千个任务。

我应该限制任务的创建吗?像 100 个任务的块? 还是我只是想多了,创建上千个任务就好了。

CPU 以 2-4% 的峰值空闲,所以我考虑了太多的等待或上下文切换。

也许 WebRequest 调用太多,因为 WebServer/WebService 无法同时处理数千个请求,我应该只限制 WebRequests?

我已经在 app.config 文件中增加了 .NET maxconnection。

【问题讨论】:

  • 问题是什么?
  • 您可以使用Parallel.ForEach自动分发并行作品。 Parallel.ForEach 包含 Partitioner 以避免产生不必要的任务。 msdn.microsoft.com/en-us/library/…
  • await 并不暗示您启动线程并并行工作。所以我认为你必须重新考虑你的想法,让它成为一个真正的线程应用程序。
  • @NiyokoYuliawan Parallel.Foreach 用于 CPU 绑定的工作,而不是 IO 绑定的工作。
  • 是的,你想多了。任务不是线程。它们是小而便宜的包装纸,“数千”是花生。

标签: c# asynchronous async-await


【解决方案1】:

可以并行执行异步操作限制并发操作的数量。有一个很酷的扩展方法,它不是 .Net 框架的一部分

/// <summary>
/// Enumerates a collection in parallel and calls an async method on each item. Useful for making 
/// parallel async calls, e.g. independent web requests when the degree of parallelism needs to be
/// limited.
/// </summary>
public static Task ForEachAsync<T>(this IEnumerable<T> source, int degreeOfParalellism, Func<T, Task> action)
{
    return Task.WhenAll(Partitioner.Create(source).GetPartitions(degreeOfParalellism).Select(partition => Task.Run(async () =>
    {
        using (partition)
            while (partition.MoveNext())
                await action(partition.Current);
    })));
}

这样称呼它:

var files = new List<string> {"one", "two", "three"};
await files.ForEachAsync(5, async file =>
{
   // do async stuff here with the file
   await Task.Delay(1000);
});

【讨论】:

  • 谢谢,我不知道这种实现。我总是使用 SemaphoreSlim 的大小来限制我的工作。
【解决方案2】:

正如评论者正确指出的那样,您想多了。 .NET 运行时跟踪数千个任务绝对没有问题。

但是,您可能需要考虑使用 TPL 数据流管道,这将使您能够轻松地为管道中的不同操作(“块”)设置不同的并发级别。

【讨论】:

    猜你喜欢
    • 2016-06-04
    • 1970-01-01
    • 2015-02-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多