【问题标题】:HttpClient GetAsync ThreadPool StarvationHttpClient GetAsync 线程池饥饿
【发布时间】:2016-06-23 15:03:38
【问题描述】:

我们有一个面向微服务的后端堆栈。所有的微服务都建立在Nancy之上,并使用topshelf注册为windows服务。

处理大部分流量 (~5000 req/s) 的服务之一开始在 8 台服务器中的 3 台上出现线程池饥饿问题。

这是我们在到达特定端点时遇到的异常:

System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
   at System.Net.HttpWebRequest.BeginGetResponse(AsyncCallback callback, Object state)
   at System.Net.Http.HttpClientHandler.StartGettingResponse(RequestState state)
   at System.Net.Http.HttpClientHandler.StartRequest(Object obj)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at RandomNamedClient.<GetProductBySkuAsync>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at ProductService.<GetBySkuAsync>d__3.MoveNext() in ...\ProductService.cs:line 34
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at ProductModule.<>c__DisplayClass15.<<.ctor>b__b>d__1d.MoveNext() in ...\ProductModule.cs:line 32

此端点调用另一个服务(不在我的团队的域中)以获取产品数据。实现如下:

Get["/product/sku/{sku}", true] = async (parameters, ctx) =>
{
    string sku = parameters.sku;
    var product = await productService.GetBySkuAsync(sku);
    return Response.AsJson(new ProductRepresentation(product));
};

ProductService.GetBySkuAsync(string sku) 实现:

public async Task<Product> GetBySkuAsync(string sku)
{
    var productDto = await randomNamedClient.GetProductBySkuAsync(sku);
    if (productDto == null)
    {
        throw new ProductDtoNotFoundException("sku", sku);
    }

    var variantDto = productDto.VariantList.FirstOrDefault(v => v.Sku == sku);

    if (variantDto == null)
    {
        throw new ProductVariantDtoNotFoundException("sku", sku);
    }

    return MapVariantDtoToProduct(variantDto, productDto);
}

RandomNamedClient.GetProductBySkuAsync(string sku) 实现(来自内部包):

public async Task<ProductDto> GetProductBySkuAsync(string sku)
{
  HttpResponseMessage result = await this._serviceClient.GetAsync("Product?Sku=" + sku);
  return result == null || result.StatusCode != HttpStatusCode.OK ? (ProductDto) null : this.Decompress<ProductDto>(result);
}

RandomNamedClient.Decompress&lt;T&gt;(HttpResponseMessage response) 实现:

private T Decompress<T>(HttpResponseMessage response)
{
  if (!response.Content.Headers.ContentEncoding.Contains("gzip"))
    return HttpContentExtensions.ReadAsAsync<T>(response.Content).Result;
  using (GZipStream gzipStream = new GZipStream((Stream) new MemoryStream(response.Content.ReadAsByteArrayAsync().Result), CompressionMode.Decompress))
  {
    byte[] buffer = new byte[8192];
    using (MemoryStream memoryStream = new MemoryStream())
    {
      int count;
      do
      {
        count = gzipStream.Read(buffer, 0, 8192);
        if (count > 0)
          memoryStream.Write(buffer, 0, count);
      }
      while (count > 0);
      return JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(memoryStream.ToArray()));
    }
  }
}

我们所有的服务都构建为 Release/32 位。我们没有对线程池的使用进行任何调整。

【问题讨论】:

    标签: c# async-await threadpool nancy dotnet-httpclient


    【解决方案1】:

    我在这段代码中看到的最大问题是Decompress&lt;T&gt; 方法,它使用Task.Result 阻塞异步操作。这可能会延迟当前处理线程池请求的线程的检索,或者更糟糕的是导致代码中的死锁(这正是您shouldn't block on async code 的原因)。我不确定您是否已经看到这些请求得到彻底处理,但如果 NancyFX 正在为您处理同步上下文的封送处理(看起来像 it does),这很可能是线程池饥饿的根本原因。

    您也可以通过使所有 IO 处理在该方法 async 中工作来改变这一点,并利用这些类已经公开的自然异步 API。或者,我绝对不建议这样做,您可以在任何地方使用ConfigureAwait(false)

    旁注 - 您可以使用Stream.CopyToAsync() 简化代码)

    正确的异步实现如下所示:

    private async Task<T> DecompressAsync<T>(HttpResponseMessage response)
    {
        if (!response.Content.Headers.ContentEncoding.Contains("gzip"))
            return await response.Content.ReadAsAsync<T>();
    
        const int bufferSize = 8192;        
        using (GZipStream gzipStream = new GZipStream(
                                       new MemoryStream(
                                            await response.Content.ReadAsByteArrayAsync()), 
                                            CompressionMode.Decompress))
        using (MemoryStream memoryStream = new MemoryStream())
        {
            await gzipStream.CopyToAsync(memoryStream, bufferSize);
            return JsonConvert.DeserializeObject<T>(
                        Encoding.UTF8.GetString(memoryStream.ToArray()));
        }
    }
    

    【讨论】:

    • 感谢您的详细回复。我怀疑解压缩会阻止我们执行。我会通知维护客户端库的团队,我可能会接受这个作为答案
    • @onatm 希望有帮助!
    • @onatm 很高兴听到。
    猜你喜欢
    • 2012-07-26
    • 1970-01-01
    • 2021-06-08
    • 2021-02-20
    • 1970-01-01
    • 2020-08-05
    • 2023-01-07
    • 2017-12-15
    • 2021-06-25
    相关资源
    最近更新 更多