【问题标题】:Using new Thread instead of Task.Run on BlockingCollection在 BlockingCollection 上使用新线程而不是 Task.Run
【发布时间】:2020-12-15 11:57:29
【问题描述】:

我们的代码在返回时有时会出现问题

我有一个生产者/消费者应用程序,它启动多个消费者等待 BlockingCollection(命名队列)。

  for (var i = 0; i < 10; i++)
    {

    var t = Task.Factory.StartNew(async () =>
     {
     try
      {
           await InboundQueue(stoppingToken.Value, queueProcessorId).ConfigureAwait(false);
                               
      }
      catch (OperationCanceledException ocex)
      {
                                
      }
      catch (Exception ex)
      {
                                
      }
      }, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach);

在处理 processMessage 之前,每个消费者都在 TryTake 方法中被阻止。

if (!_queue.TryTake(out var message, timeToWait, cancellationToken))
{
..
await processMessage((T)message).ConfigureAwait(false);
..
}

在 processMessage 中,我们多次调用数据库执行 async/await,使用 async/await 调用 httpclient,甚至在某些情况下执行 Task.Run 来执行操作而不等待完成。

await 需要一些时间才能完成,这很奇怪。我们在 processMessage 中添加了一些额外的 ConfigureAwait(false) 代码,它在某些情况下有所帮助。仍然缓慢等待发生。

https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#avoid-using-taskrun-for-long-running-work-that-blocks-the-thread 阅读这篇文章,我们将更改代码以使用“新线程”而不是“Task.Factory.StartNew”。

1.因为我们将为运行消费者创建一个标准线程,所以我们是否需要在 processMesssage 方法中额外的 ConfigureAwait(false) 调用异步 httpclient 或数据库?

2.processMessage 有自己的线程,可以轻松同步。我们可以在请求异步调用的库上安全地执行 .Result 吗?

3.我们如何调试这种情况?注意:通过跟踪网络我们发现https调用和数据库调用总是很快的。

提前致谢。

【问题讨论】:

标签: .net multithreading async-await


【解决方案1】:

原代码有问题:StartNew 看不懂async 代码。具体来说,LongRunning 标志在那里几乎毫无意义,因为它只适用于代码的一部分,直到它遇到第一个 await

我有一个生产者/消费者应用程序,它启动多个消费者等待 BlockingCollection(命名队列)。

在处理 processMessage 之前,每个消费者都在 TryTake 方法中被阻止。

所以你最终阻塞了一堆线程池线程,我怀疑这实际上是导致你的问题的原因(特别是如果这种模式在其他地方重复)。

await 需要一些时间才能完成。

这听起来像是线程池耗尽,如果线程池线程被定期阻塞,就会发生这种情况。

我们在 processMessage 中的一些代码中添加了额外的 ConfigureAwait(false),它在某些情况下有所帮助。

怀疑。在线程池线程上,ConfigureAwait(false) 无效。

https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#avoid-using-taskrun-for-long-running-work-that-blocks-the-thread阅读这篇文章,我们将更改代码以使用“新线程”而不是“Task.Factory.StartNew”。

嗯,了解上下文以及为什么原始代码不好是很重要的。 (旁注:大卫指出“不要将TaskCreationOptions.LongRunningasync 代码一起使用,因为这将创建一个新线程,该线程将在第一个await 之后被销毁”,这就是我上面第一段的原因)。

问题是线程池线程定期被阻塞。如果只是启动时的几个线程,我个人认为没关系,只允许线程池注入替换。但是如果代码经常阻塞线程池线程,那就是个问题,可能会导致线程池耗尽。

现在,关于解决方案,一种解决方案确实是使用手动线程,但这不是我推荐的解决方案。 IMO 一个更好的解决方案是消除阻塞。

具体来说,将BlockingCollection&lt;T&gt; 队列替换为async 友好的对象,例如System.Threading.Channels。或者,如果您对更大的重构感兴趣,请查看 TPL 数据流。

删除阻塞是比添加手动线程 IMO 更好的解决方案。也就是说,要回答您的具体问题:

因为我们将为运行消费者创建一个标准线程,所以我们是否需要在 processMesssage 方法中额外的 ConfigureAwait(false) 调用异步 httpclient 或数据库?

对于手动线程 - 就像线程池线程一样 - ConfigureAwait(false) 毫无意义。

processMessage 有自己的线程,可以轻松同步。

实际上,它必须是同步的。自定义线程不能是异步的(线程在第一个await 处退出)。除非你安装了单线程上下文,否则ConfigureAwait(false)确实有意义。这变得相当复杂。

我们可以在请求异步调用的库上安全地执行 .Result 吗?

这取决于库。但他们可能会很好。注意:GetAwaiter().GetResult()Result 类似,只是它不会将异常包装在 AggregateException 中。

我们如何调试这种情况?

这是个好问题。 This article 可能会有所帮助;本质上,捕获性能分析并查找常规线程注入事件。然后,您可以通过拍摄快照并检查线程堆栈来找到常见的阻塞 API。

【讨论】:

  • 您是否建议将 Task.Factory.StartNew 替换为 Task.Run?
  • 如果需要将工作排队到线程池线程,那么可以。
  • 我们需要交易并行运行。
  • 我相信你需要并发;我认为不需要并行性。只需使用async 方法即可获得异步并发(如果需要加入结果,请使用Task.WhenAll)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-12-19
  • 1970-01-01
  • 2012-07-11
  • 2015-10-27
  • 2015-08-23
  • 1970-01-01
相关资源
最近更新 更多