【问题标题】:How to execute tasks in parallel but not more than N tasks per T seconds?如何并行执行任务但每 T 秒不超过 N 个任务?
【发布时间】:2020-02-13 12:37:12
【问题描述】:

我需要尽可能快地并行运行许多任务。但是如果我的程序每 1 秒运行超过 30 个任务,就会被阻塞。如何确保每 1 秒间隔运行的任务不超过 30 个?

换句话说,如果在最后 1 秒的时间间隔内完成了 30 个任务,我们必须阻止新任务启动。

我丑陋的可能解决方案:

private async Task Process(List<Task> taskList, int maxIntervalCount, int timeIntervalSeconds)
{
    var timeList = new List<DateTime>();

    var sem = new Semaphore(maxIntervalCount, maxIntervalCount);
    var tasksToRun = taskList.Select(async task =>
    {
        do
        {
            sem.WaitOne();
        }
        while (HasAllowance(timeList, maxIntervalCount, timeIntervalSeconds));

        await task;

        timeList.Add(DateTime.Now);

        sem.Release();
    });

    await Task.WhenAll(tasksToRun);
}

private bool HasAllowance(List<DateTime> timeList, int maxIntervalCount, int timeIntervalSeconds)
{
    return timeList.Count <= maxIntervalCount 
    || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount];
}

【问题讨论】:

  • 实际的节流由 ReactiveX 操作符提供,例如 WindowBuffer。您可以使用具有有限 DOP 的 ActionBlock&lt;T&gt; 和可能的 await Task.Delay() 来确保您每秒拨打的电话不超过 N 次
  • 谁投票关闭为“基于意见”,绝对不是。你可以争辩说过去也有类似的问题,但这绝对不是见仁见智的问题。
  • This is probably a duplicate。一个答案显示如何使用 DOP 为 50 的 DataFlow 块将并发操作限制为 50。另一个显示如何使用 SemaphoreSlim。可以使用 both - 一个 DOP 将操作限制为不超过 30(或更少),以及一个 SemaphoreSlim,每 1 秒由计时器重置一次。
  • 假设您最初开始了 30 个任务。在时间 0:00.5(半秒后)所有 30 个任务仍在运行。此时 0:01.0(一秒后)有 15 个任务已完成,还有 15 个仍在运行。是否允许再开始 15 个任务?如果是,那么在 0:00.5 - 0:01.1 的时间间隔内,有超过 30 个任务处于活动状态。如果不是,则只有在完成所有 30 个初始任务后,您才能开始新任务。哪一个是期望的行为?
  • 这是一个很好且足够详细的问题,为什么它被关闭了?

标签: c# .net asynchronous .net-core task-parallel-library


【解决方案1】:

用户代码永远不必直接控制任务的调度方式。一方面,它不能——控制任务的运行方式是TaskScheduler 的工作。当用户代码调用.Start()时,它只是简单地将一个任务添加到线程池队列中执行。 await 执行已经执行的任务。

TaskScheduler 示例展示了如何创建有限的并发调度程序,但同样有更好的高级选项。

问题的代码无论如何都不会限制排队的任务,它会限制可以等待的任务数量。他们都已经在运行了。这类似于在管道中批处理 previous 异步操作,只允许有限数量的消息传递到下一个级别。

有延迟的动作块

开箱即用的简单方法是使用具有有限 MaxDegreeOfParallelism 的 ActionBlock,以确保最多可以同时运行 N 个并发操作。如果我们知道每个操作需要多长时间,我们可以增加一点延迟以确保我们不会超过油门限制。

在这种情况下,7 个并发工作人员每秒执行 4 个请求,总共每秒最多 28 个请求。 BoundedCapacity 表示在 downloader.SendAsync 块之前最多只能将 7 个项目存储在输入缓冲区中。这样我们就可以避免在操作时间过长时淹没ActionBlock

var downloader = new ActionBlock<string>(
        async url => {
            await Task.Delay(250);
            var response=await httpClient.GetStringAsync(url);
            //Do something with it.
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 7, BoundedCapacity=7 }
);

//Start posting to the downloader
foreach(var item in urls)
{
    await downloader.SendAsync(item);
}
downloader.Complete();
await downloader.Completion;

带有 SemaphoreSlim 的 ActionBlock

另一种选择是将其与由计时器定期重置的SemaphoreSlim 结合使用。

var refreshTimer = new Timer(_=>sm.Release(30));

var downloader = new ActionBlock<string>(
        async url => {
            await semaphore.WaitAsync();
            try 
            {
                var response=await httpClient.GetStringAsync(url);
                //Do something with it.
            }
            finally
            {
                semaphore.Release();
            }
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity=5 }
);

//Start the timer right before we start posting 
refreshTimer.Change(1000,1000);
foreach(....)
{

}

【讨论】:

    【解决方案2】:

    这是sn-p:

    var tasks = new List<Task>();
    
    foreach(item in listNeedInsert)
    {
        var task = TaskToRun(item);
        tasks.Add(task);
    
        if(tasks.Count == 100)
        {
            await Task.WhenAll(tasks);
            tasks.Clear();
        }
    }
    
    // Wait for anything left to finish
    await Task.WhenAll(tasks);
    

    请注意,我宁愿将任务添加到 List&lt;Task&gt;(); 中,并且在添加完之后,我在同一个 List&lt;Task&gt;(); 中等待所有内容

    你在这里做什么:

     var tasks = taskList.Select(async task =>
        {
            do
            {
                sem.WaitOne();
            }
            while (timeList.Count <= maxIntervalCount 
            || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount]);
    
            await task;
    

    在任务完成之前一直阻塞,因此进行此调用:

    Task.WhenAll(tasks).Wait();
    

    完全多余。此外,这行Task.WhenAll(tasks).Wait(); 正在对WhenAll 方法执行不必要的阻塞。

    【讨论】:

      【解决方案3】:

      阻塞是由于某些服务器/防火墙/硬件限制还是基于观察?

      您应该尝试使用BlockingCollection&lt;Task&gt; 或类似的thread safe collections,尤其是当您的任务工作受I/O 限制时。您甚至可以将容量设置为 30:

      var collection = BlockingCollection<Task>(30);
      

      然后你可以启动2个异步方法:

      var population = Task.Factory.Start(Populate);
      var processing = Task.Factory.Start(Dequeue);
      await Task.WhenAll(population, processing);
      
      Task Populate()
      {
          foreach (...)
              collection.Add(...);
          collection.CompleteAdding();
      }
      Task Dequeue
      {
          while(!collection.IsComplete)
              await collection.Take();                            //consider using TryTake()
      }
      

      如果由于某些真正的限制而存在限制(应该非常罕见),请按如下方式更改 Populate():

      var stopper = Stopwatch.StartNew();
      for (var i = ....)                                          //instead of foreach
      {
          if (i % 30 == 0)
          {
              if (stopper.ElapsedMilliseconds < 1000)
                  Task.Delay(1000 - stopper.ElapsedMilliseconds); //note that this race condition should be avoided in your code
              stopper.Restart();
          }
          collection.Add(...);
      }
      collection.CompleteAdding();
      

      【讨论】:

        【解决方案4】:

        我认为这个问题可以通过 SemaphoreSlim 限制每个间隔的最大任务数来解决,也可以通过 Task.Delay 在每个任务完成后延迟释放 SemaphoreSlim 来解决,一段时间等于所需的节流间隔。下面是基于这个想法的一个实现。速率限制可以通过两种方式应用:

        1. 对于includeAsynchronousDuration: false,速率限制会影响在指定时间跨度内可以启动的操作数。不考虑每个操作的持续时间。

        2. 对于includeAsynchronousDuration: true,速率限制会影响在指定时间跨度内可以将多少操作计为“活动”,并且具有更多限制性(使枚举变慢)。不是将每个操作计为一个时间点(开始时),而是计为一个时间跨度(在开始和完成之间)。当且仅当它自己的时间跨度与指定的时间跨度相交时,一个操作在指定的时间跨度内被计为“活动”。

        /// <summary>
        /// Applies an asynchronous transformation for each element of a sequence,
        /// limiting the number of transformations that can start or be active during
        /// the specified time span.
        /// </summary>
        public static async Task<TResult[]> ForEachAsync<TSource, TResult>(
            this IEnumerable<TSource> source,
            Func<TSource, Task<TResult>> action,
            int maxActionsPerTimeUnit,
            TimeSpan timeUnit,
            bool includeAsynchronousDuration = false,
            bool onErrorContinue = false, /* Affects only asynchronous errors */
            bool executeOnCapturedContext = false)
        {
            if (source == null) throw new ArgumentNullException(nameof(source));
            if (action == null) throw new ArgumentNullException(nameof(action));
            if (maxActionsPerTimeUnit < 1)
                throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
            if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
                throw new ArgumentOutOfRangeException(nameof(timeUnit));
        
            using var semaphore = new SemaphoreSlim(maxActionsPerTimeUnit,
                maxActionsPerTimeUnit);
            using var cts = new CancellationTokenSource();
            var tasks = new List<Task<TResult>>();
            var releaseTasks = new List<Task>();
        
            try // Watch for exceptions thrown by the source enumerator
            {
                foreach (var item in source)
                {
                    try
                    {
                        await semaphore.WaitAsync(cts.Token)
                            .ConfigureAwait(executeOnCapturedContext);
                    }
                    catch (OperationCanceledException) { break; }
        
                    // Exceptions thrown synchronously by invoking the action are breaking
                    // the loop unconditionally (the onErrorContinue has no effect on them).
                    var task = action(item);
                    if (!onErrorContinue) task = ObserveFailureAsync(task);
                    tasks.Add(task);
                    releaseTasks.Add(ScheduleSemaphoreReleaseAsync(task));
                }
            }
            catch (Exception ex) { tasks.Add(Task.FromException<TResult>(ex)); }
            cts.Cancel(); // Cancel all release tasks
        
            Task<TResult[]> whenAll = Task.WhenAll(tasks);
            try { return await whenAll.ConfigureAwait(false); }
            catch (OperationCanceledException) when (whenAll.IsCanceled) { throw; }
            catch { whenAll.Wait(); throw; } // Propagate AggregateException
            finally { await Task.WhenAll(releaseTasks); }
        
            async Task<TResult> ObserveFailureAsync(Task<TResult> task)
            {
                try { return await task.ConfigureAwait(false); }
                catch { cts.Cancel(); throw; }
            }
        
            async Task ScheduleSemaphoreReleaseAsync(Task<TResult> task)
            {
                if (includeAsynchronousDuration)
                    try { await task.ConfigureAwait(false); } catch { } // Ignore exceptions
                // Release only if the Task.Delay completed successfully
                try { await Task.Delay(timeUnit, cts.Token).ConfigureAwait(false); }
                catch (OperationCanceledException) { return; }
                semaphore.Release();
            }
        }
        

        使用示例:

        int[] results = await ForEachAsync(Enumerable.Range(1, 100), async n =>
        {
            await Task.Delay(500); // Simulate some asynchronous I/O-bound operation
            return n;
        }, maxActionsPerTimeUnit: 30, timeUnit: TimeSpan.FromSeconds(1.0),
            includeAsynchronousDuration: true);
        

        使用catch+Wait 技术传播AggregateException 的原因在here 中进行了解释。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2018-06-08
          • 1970-01-01
          相关资源
          最近更新 更多