【问题标题】:Proper way to use HttpClient within Parallel.Foreach to make a large number of POST requests在 Parallel.Foreach 中使用 HttpClient 发出大量 POST 请求的正确方法
【发布时间】:2019-06-18 22:40:45
【问题描述】:

正如您在下面的代码中所看到的,在高层次上,此代码递归地读取文件夹结构并将其内容发布到 API。应用程序是.Net core 2.1。

我有这个向 API 发送 POST 的服务。

    public class EnterpriseService
    {
        private readonly HttpClient _httpClient;

        public EnterpriseService(HttpClient httpClient)
        {
            _httpClient = httpClient;
        }

        public async Task<string> PostTransactionAsync(byte[] payload)
        {
            using (var request = new HttpRequestMessage(HttpMethod.Post, new Uri("https://www.foo.com/api/transaction")))
            {
                request.Content = new ByteArrayContent(payload);
                HttpResponseMessage response = await _httpClient.SendAsync(request);
                return await response.Content.ReadAsStringAsync();
            }
        }
    }

调用者正在以下列方式调用 PostTransactionAsync:

        protected async Task SearchFoldersAsync(List<FileStatusProperties> folders, string root, CancellationToken cancellationToken)
        {
            await Task.Run(() =>
            {
                return Parallel.ForEach(folders, async entry =>
                {
                    if (entry.Type == FileType.DIRECTORY)
                    {
                        await SearchFoldersAsync(
                            DataLakeStorage.DirectoryGetFiles($"{root}/{entry.PathSuffix}"),
                            $"{root}/{entry.PathSuffix}", cancellationToken);
                        return;
                    }

                    byte[] payload = DataLakeStorage.FileDownload($"{root}/{entry.PathSuffix}");
                    await _enterpriseService.PostTransactionAsync(payload);

                });
            }, cancellationToken);

        }

请注意,我使用的是作为单例 DI 的 HttpClient。

我还递归使用了 Parallel.Foreach。

此代码非常适用于包含 10K+ 文件的较小文件夹结构。但是当文件数量增加时(比如说文件夹中的文件达到大约 100K),我会得到这两个错误的混合。大约 20% 的请求成功。 40% 的请求最终在 _httpClient.SendAsync 调用中出现这 2 个异常。请求在 10 秒后失败。

每个套接字地址(协议/网络地址/端口)只能使用一次 通常是允许的

操作已取消。无法从传输中读取数据 连接:I/O 操作已中止,因为 线程退出或应用程序请求。 I/O 操作已 由于线程退出或应用程序请求而中止

我阅读了 HttpClient 的用法,据我所知,我没有做错任何事情。但我不确定它是否与递归 Parallel.ForEach 一起使用。

我想知道在我需要同时发出大量 http 请求的情况下处理这种情况的推荐方法是什么?

【问题讨论】:

  • 简而言之:你没有。 Parallel.ForEach 不适用于异步委托。它导致不可等待的异步无效。如果你想启动多个异步 Http 调用,那么 Select 在传入的集合上。
  • 有人能解释一下否决票吗?

标签: .net-core async-await dotnet-httpclient parallel.foreach


【解决方案1】:

Parallel 用于并行,这是一种并发形式,它使用多个线程将 CPU 绑定的工作拆分到多个内核。您想要的是异步并发,这是一种更适合同时执行多个 I/O 绑定操作的方法。

通过为每个项目启动Task(通常使用Select),然后对所有这些任务执行await Task.WhenAll,最容易实现异步并发。像这样的:

protected async Task SearchFoldersAsync(List<FileStatusProperties> folders, string root, CancellationToken cancellationToken)
{
  var tasks = folders.Select(async entry =>
  {
    if (entry.Type == FileType.DIRECTORY)
    {
      await SearchFoldersAsync(
          DataLakeStorage.DirectoryGetFiles($"{root}/{entry.PathSuffix}"),
          $"{root}/{entry.PathSuffix}", cancellationToken);
      return;
    }

    byte[] payload = DataLakeStorage.FileDownload($"{root}/{entry.PathSuffix}");
    await _enterpriseService.PostTransactionAsync(payload);
  }).ToList();
  await Task.WhenAll(tasks);
}

【讨论】:

  • '这是一种更适合同时执行多个 I/O 绑定操作的方法。' - 你能详细说明一下吗?我尝试了这种方法,性能显着下降。所以我真的很想了解是否存在不应使用并行 foreach 的情况。谢谢。
  • async 使用回调模型进行 I/O:当 I/O 完成时会通知代码。 Parallel 使用阻塞模型:调用线程被阻塞,直到操作完成。所以Parallel 使用更多的线程和内存来做同样的事情。
【解决方案2】:

我不打算讨论并行与异步...

但是这个特定的错误

每个套接字地址(协议/网络地址/端口)通常只允许使用一次

这似乎是因为从 1 个系统到另一个系统上的单个端口只能有大约 65k 连接。

假设现有服务器进程使用端口 80,您可以启动使用其他端口的其他进程。但是您需要超过 1 个 HttpClient 并且需要在它们之间进行循环或其他操作。进程过多,您可能会开始在客户端或服务器上达到打开文件描述符的限制。

【讨论】:

    猜你喜欢
    • 2015-09-19
    • 1970-01-01
    • 1970-01-01
    • 2021-06-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多