【问题标题】:Await multiple async Task while setting max running task at a time一次设置最大运行任务时等待多个异步任务
【发布时间】:2014-04-26 20:58:15
【问题描述】:

所以我刚开始尝试理解 async、Task、lambda 等,但我无法让它像我想要的那样工作。使用下面的代码,我希望它锁定 btnDoWebRequest,将未知数量的 WebRequest 作为任务执行,一旦所有任务完成,解锁 btnDoWebRequest。但是,我只希望一次运行最多 3 个或我设置的任何数量的任务,部分来自Have a set of Tasks with only X running at a time

但在尝试并以多种方式修改我的代码后,它总是会立即跳回并重新启用 btnDoWebRequest。当然,VS警告我需要等待,目前在“.ContinueWith((task)”和“await Task.WhenAll(requestInfoList .Select(async i =>”)中的异步,但似乎无法在哪里工作或如何进行所需的等待。当然,由于我仍在学习,我很有可能在这一切上都做错了,整个事情都需要重新设计。所以任何帮助都将不胜感激。

谢谢

    private SemaphoreSlim maxThread = new SemaphoreSlim(3);
    private void btnDoWebRequest_Click(object sender, EventArgs e)
    {
        btnDoWebRequest.Enabled = false;
        Task.Factory.StartNew(async () => await DoWebRequest()).Wait();
        btnDoWebRequest.Enabled = true;
    }

    private async Task DoWebRequest()
    {
        List<requestInfo> requestInfoList = new List<requestInfo>();
        for (int i = 0; dataRequestInfo.RowCount - 1 > i; i++)
        {
            requestInfoList.Add((requestInfo)dataRequestInfo.Rows[i].Tag);
        }
        await Task.WhenAll(requestInfoList .Select(async i => 
        {
            maxThread.Wait();
            Task.Factory.StartNew(() =>
            {
                var task = Global.webRequestWork(i);
            }, TaskCreationOptions.LongRunning).ContinueWith((task) => maxThread.Release());
        }));
    }

【问题讨论】:

  • 编译器错误可能是因为您的事件处理程序没有 async 关键字?如果没有这个,我怀疑处理程序中的 await 关键字是否按预期工作,即等待长时间运行的操作完成。
  • 而且,由于您的事件处理程序将阻塞,直到一切完成,您将切换服务器上按钮的启用状态,并且只有在您要调试代码时才会证明这一点。

标签: c# multithreading asynchronous async-await task-parallel-library


【解决方案1】:

首先,不要默认使用Task.Factory.StartNew。事实上,在async 代码中应该避免这种情况。如果您需要在后台线程上执行代码,请使用Task.Run

在你的情况下,没有必要使用Task.Run(或Task.Factory.StartNew)。

从最低级别开始,然后逐步向上。您已经有了一个异步 Web 请求方法,我将把它重命名为 WebRequestAsync 以遵循 Task-based Asynchronous Programming 命名准则。

接下来,使用SemaphoreSlim 上的异步 API 来限制它:

await maxThread.WaitAsync();
try
{
  await Global.WebRequestWorkAsync(i);
}
finally
{
  maxThread.Release();
}

对每个请求信息都这样做(注意不需要后台线程):

private async Task DoWebRequestsAsync()
{
  List<requestInfo> requestInfoList = new List<requestInfo>();
  for (int i = 0; dataRequestInfo.RowCount - 1 > i; i++)
  {
    requestInfoList.Add((requestInfo)dataRequestInfo.Rows[i].Tag);
  }
  await Task.WhenAll(requestInfoList.Select(async i => 
  {
    await maxThread.WaitAsync();
    try
    {
      await Global.WebRequestWorkAsync(i);
    }
    finally
    {
      maxThread.Release();
    }
  }));
}

最后,从您的 UI 中调用它(同样,不需要后台线程):

private async void btnDoWebRequest_Click(object sender, EventArgs e)
{
  btnDoWebRequest.Enabled = false;
  await DoWebRequestsAsync();
  btnDoWebRequest.Enabled = true;
}

总之,只在需要的时候使用Task.Run;不要使用Task.Factory.StartNew,也不要使用Wait(改用await)。我的博客上有 async intro 以提供更多信息。

【讨论】:

    【解决方案2】:

    您的代码有几个问题:

    1. 在任务上使用 Wait() 就像同步运行事物一样,因此您只会注意到 UI 会在所有操作完成并重新启用按钮时做出反应。您需要等待异步方法才能真正运行异步。更重要的是,如果一个方法正在像 Web 请求一样执行 IO 绑定工作,则启动一个新的线程池线程(使用 Task.Factory.StartNew)是多余的,并且是资源的浪费。

    2. 您的按钮单击事件处理程序需要标记为异步,以便您可以在方法中等待。

    3. 为了清楚起见,我已经清理了您的代码,使用新的 SemaphoreSlim WaitAsync 并将您的 for 替换为 LINQ 查询。您只能将前两点应用到您的代码中。

      private SemaphoreSlim maxThread = new  SemaphoreSlim(3);
      
      private async void btnDoWebRequest_Click(object  sender, EventArgs e)
      {
          btnDoWebRequest.Enabled = false;
          await DoWebRequest();
          btnDoWebRequest.Enabled = true;
       }
      
       private async Task DoWebRequest()
       {
           List<requestInfo> requestInfoList = new List<requestInfo>();
      
           var requestInfoList =  dataRequestInfo.Rows.Select(x => x.Tag).Cast<requestInfo>();
      
          var tasks = requestInfoList.Select(async I => 
          {
               await maxThread.WaitAsync();
               try
               {
                   await Global.webRequestWork(i);
               }
               finally
               {
                   maxThread.Release();
               }
         });
      
         await Task.WhenAll(tasks);
      

    【讨论】:

      【解决方案3】:

      我为此创建了一个扩展方法。

      可以这样使用:

      var tt = new List<Func<Task>>()
      {
          () => Thread.Sleep(300), //Thread.Sleep can be replaced by your own functionality, like calling the website
          () => Thread.Sleep(800),
          () => Thread.Sleep(250),
          () => Thread.Sleep(1000),
          () => Thread.Sleep(100),
          () => Thread.Sleep(200),
      };
      await tt.WhenAll(3); //this will let 3 threads run, if one ends, the next will start, untill all are finished.
      

      扩展方法:

      public static class TaskExtension
      {
          public static async Task WhenAll(this List<Func<Task>> actions, int threadCount)
          {
              var _countdownEvent = new CountdownEvent(actions.Count);
              var _throttler = new SemaphoreSlim(threadCount);
      
              foreach (Func<Task> action in actions)
              {
                  await _throttler.WaitAsync();
      
                  Task.Run(async () =>
                  {
                      try
                      {
                          await action();
                      }
                      finally
                      {
                          _throttler.Release();
                          _countdownEvent.Signal();
                      }
                  });
              }
      
              _countdownEvent.Wait();
          }
      }
      

      【讨论】:

        【解决方案4】:

        我们可以使用 SemaphoreSlim 轻松实现这一点。我创建的扩展方法:

            /// <summary>
            /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
            /// </summary>
            /// <typeparam name="T">Type of IEnumerable</typeparam>
            /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
            /// <param name="action">an async <see cref="Action" /> to execute</param>
            /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
            /// Must be grater than 0</param>
            /// <returns>A Task representing an async operation</returns>
            /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
            public static async Task ForEachAsyncConcurrent<T>(
                this IEnumerable<T> enumerable,
                Func<T, Task> action,
                int? maxActionsToRunInParallel = null)
            {
                if (maxActionsToRunInParallel.HasValue)
                {
                    using (var semaphoreSlim = new SemaphoreSlim(
                        maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
                    {
                        var tasksWithThrottler = new List<Task>();
        
                        foreach (var item in enumerable)
                        {
                            // Increment the number of currently running tasks and wait if they are more than limit.
                            await semaphoreSlim.WaitAsync();
        
                            tasksWithThrottler.Add(Task.Run(async () =>
                            {
                                await action(item);
        
                                // action is completed, so decrement the number of currently running tasks
                                semaphoreSlim.Release();
                            }));
                        }
        
                        // Wait for all tasks to complete.
                        await Task.WhenAll(tasksWithThrottler.ToArray());
                    }
                }
                else
                {
                    await Task.WhenAll(enumerable.Select(item => action(item)));
                }
            }
        

        示例用法:

        await enumerable.ForEachAsyncConcurrent(
            async item =>
            {
                await SomeAsyncMethod(item);
            },
            5);
        

        【讨论】:

          猜你喜欢
          • 2019-05-07
          • 1970-01-01
          • 1970-01-01
          • 2014-03-18
          • 1970-01-01
          • 2012-11-15
          • 2020-04-24
          • 1970-01-01
          相关资源
          最近更新 更多