【问题标题】:Releasing threads during async tasks在异步任务期间释放线程
【发布时间】:2016-09-09 10:41:51
【问题描述】:

我的系统会产生大量必须并行运行的子进程

  • 请求的主线程将产生子进程并等待它们完成。
    • 那些子流程做一些处理
    • 然后与远程 API 对话
    • 然后对来自 API 的结果进行更多处理
  • 然后当所有子进程都完成(或超时)时,主线程继续运行

我们在线程计数方面遇到问题,因此我们希望通过在等待远程 API 时尝试释放线程来减少活动线程的数量。

最初我们使用WebRequest.GetResponse() 进行API 调用,这自然会在等待API 时保持空闲线程。

我们开始使用 EAP 模型(基于事件的异步编程......所有使用 IAsyncResult 的各种 .NET 方法),我们在其中调用 BeginGetResponse(CallbackTrigger),将 WaitHandles 传递回主线程,然后触发后API处理。

按照我们的理解,这意味着子进程线程终止,回调是由网卡级别的中断触发的,它会触发一个新的线程来发起回调。也就是说,在我们等待 API 调用时,没有线程等待运行 CallbackTrigger

如果人们能够确认这种理解会很好吗?

我们现在正在考虑迁移到 TPL 模型(任务并行库 ...Task<T>),使用 WebRequest.GetResponseAsync(),即 awaitable。我的印象是这是await\ async 所做的一部分......await 在远程源等待时将控制权传递回调用堆栈,如果我启动一堆@987654331 @able Tasks 然后调用 Tasks.WaitAll 则不会为每个任务保留线程而该任务正在远程 API 上等待

我是否正确理解了这一点?

【问题讨论】:

  • 小心WaitAll,它会冻结线程正在运行。您可能想查看 WhenAll 这是等待的,并在所有子任务完成后让任务恢复。
  • @Sidewinder94 是的,我们知道这一点......我们不想通过应用程序将异步代码推得太远 - 我们正在尝试(目前)只拥有这一点并行化在没有保持线程的情况下发生......我们可以在其线程上阻塞主线程。
  • 我不太熟悉 TPL 如何确定地回答您的问题,尽管我认为您是正确的
  • @Sidewinder94 是的,这是我的几位同事给出的答案。希望我有时间完成一个简单的单元测试并以一种或另一种方式证明它(届时我将在此处发布答案。)

标签: c# multithreading async-await httpwebrequest task-parallel-library


【解决方案1】:

如果人们能确认这种理解会很好吗?

是的。请注意,IAsyncResult/Begin*/End* 模式是 APM,而不是 EAP。 EAP 将是WebClient 的方法,其中DownloadAsync 方法在完成时触发DownloadCompleted 事件。

APM/EAP 是执行异步工作的 方式,但实际上是异步的(意味着它们不占用线程只是为了阻塞 I/O 完成)。它们之所以“难”,是因为它们使您的代码变得更加复杂——以至于大多数开发人员从未使用过它们,而是坚持使用同步代码。

我是否正确理解了这一点?

是的。通常,.NET 中的所有异步 I/O 都是使用作为线程池的一部分存在的单个 I/O 完成端口来实现的。无论 API 是 APM、EAP 还是 TAP,都是如此。

async/await 与 TAP 的整个想法是核心 Tasks(就像从 GetResponseAsync 返回的那些)仍然构建在相同的异步 I/O 系统上,然后是 async /await 使食用它们更加愉快;您可以使用 await 保持相同的方法,而不是使用回调 (APM) 或事件处理程序 (EAP)。

作为一个有趣的旁注,Task 实际上实现了IAsyncResult,并且从高层次的角度来看,APM 和 TAP 非常相似(IAsyncResultTask 都表示“飞行中”的操作)。

您应该会发现您的 TAP 代码比您当前的 APM/EAP 代码简单得多(并且更易于维护!),性能没有明显变化。

(顺便说一句,考虑转移到 HttpClient,它是从头开始设计时考虑到 TAP,而不是 HttpWebRequest/WebClient,后者已将 TAP 用螺栓固定在上面)。

不过……

我有一个系统,它产生了许多必须并行运行的子进程...

使用这种“管道”,您可能需要考虑转换为 TPL 数据流。 Dataflow 了解同步和异步 (TAP) 工作,并具有对限制的内置支持。与 TAP 本身相比,Dataflow 方法可以进一步简化您的代码。

【讨论】:

  • 非常感谢!如果您能指出一些参考资料来定义和正确解释 APM、EAP、TAP 和 TPL 之间的区别,那么我将不胜感激。
  • @Brondahl:APM、EAP 和 TAP 都是 here 解释的模式。 TPL 是一个更多关于并行编程的库/API。您可能还会发现 my book 很有帮助。
【解决方案2】:

在@Stephen Cleary 的回答之外,我设置了一个简短的测试来进一步证明这一点。

下面的代码在运行 Synchronous 方法时,不修改 SetMinThreads,当定位需要几秒钟才能返回的网站时,将为每个请求保持一个线程打开。它将显示越来越多的线程处于活动状态,它会立即启动前几个任务,但当它达到ThreadPool 的限制时会“阻塞”,并且只允许每半秒启动一次新线程,或者在旧请求时启动新线程结束。

设置更高的 MinThreadCount 会按预期推迟问题。

保持 MinThread Count 未设置,但切换到异步 (APM) 方法或 Await (TAP) 方法会导致所有任务立即启动,并且任何时候活动的线程数都保持在较低水平。

using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace LockTraceParser
{
  internal class AsyncThreadsTester
  {
    public void Run()
    {
      //ThreadPool.SetMinThreads(100, 100);

      Console.WriteLine("Beginning Test: ");
      LogThreadCounts();

      Test();
    }

    private void Test()
    {
      LogThreadCounts();

      for (int i = 0; i < 65; i++)
      {
        //StartParallelUserWorkItem(i);
        StartTask(i);
        Thread.Sleep(100); //sleep a while so that the other thread is working
        LogThreadCounts();
      }

      for (int i = 0; i < 40; i++)
      {
        Thread.Sleep(1100); //sleep a while so that the other thread is working
        LogThreadCounts();
      }
    }

    private void StartTask(int label)
    {
      var taskLabel = "Task " + label;
      Console.WriteLine("Enqueue " + taskLabel);
      Task.Run(() => GetResponseAwait(taskLabel));
    }

    private static void LogThreadCounts()
    {
      int worker;
      int io;
      ThreadPool.GetAvailableThreads(out worker, out io);
      Console.WriteLine("Worker Threads Available:" + '\t' + worker + '\t' + "IO Threads Available:" + '\t' + io + '\t' +
                        "Threads held by Process: " + '\t' + Process.GetCurrentProcess().Threads.Count);
    }


    private void GetResponseSync(object label)
    {
      Console.WriteLine("Start Sync     " + label);
      try
      {
        var req = GetRequest();
        using (var resp = req.GetResponse())
        {
          Console.WriteLine(resp.ContentLength);
        }
      }
      catch (Exception e)
      {
        Console.WriteLine("Error response " + label);
      }
      Console.WriteLine("End response   " + label);
    }

    private void BeginResponseAsync(object label)
    {
      Console.WriteLine("Start Async     " + label);
      try
      {
        var req = GetRequest();
        req.BeginGetResponse(EndGetResponseAsync, req);
      }
      catch (Exception e)
      {
        Console.WriteLine("Error Async " + label);
      }
    }

    private void EndGetResponseAsync(IAsyncResult result)
    {
      Console.WriteLine("Respond Async   ");
      var req = (WebRequest)result.AsyncState;

      using (var resp = req.EndGetResponse(result))
      {
        Console.WriteLine(resp.ContentLength);
      }
      Console.WriteLine("End Async   ");
    }

    private async Task GetResponseAwait(object label)
    {
      Console.WriteLine("Start Await     " + label);
      try
      {
        var req = GetRequest();
        using (var resp = await req.GetResponseAsync())
        {
          Console.WriteLine(resp.ContentLength);
        }
      }
      catch (Exception e)
      {
        Console.WriteLine("Error Await " + label);
      }
      Console.WriteLine("End Await   " + label);
    }

    private WebRequest GetRequest()
    {
      var req = WebRequest.Create("http://aslowwebsite.com");
      req.Timeout = (int)TimeSpan.FromSeconds(60).TotalMilliseconds;

      return req;
    }
  }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-13
    • 2011-03-16
    • 2022-11-02
    • 1970-01-01
    • 2015-05-08
    相关资源
    最近更新 更多