【问题标题】:Add delay to parallel API call为并行 API 调用添加延迟
【发布时间】:2020-10-25 01:16:52
【问题描述】:

我正在使用 Polly 进行并行 API 调用。但是服务器每秒不能处理超过 25 个调用,所以我想知道是否有办法在每批 25 个调用之后添加 1 秒的延迟?

var policy = Policy
    .Handle<HttpRequestException>()
    .RetryAsync(3);

foreach (var mediaItem in uploadedMedia)
{
    var mediaRequest = new HttpRequestMessage { *** }
    async Task<string> func()
    {
        var response = await client.SendAsync(mediaRequest);
        return await response.Content.ReadAsStringAsync();
    }
    tasks.Add(policy.ExecuteAsync(() => func()));
}
await Task.WhenAll(tasks);

我按照以下建议添加了计数,但似乎不起作用

foreach (var mediaItem in uploadedMedia.Items)
{
    var mediaRequest = new HttpRequestMessage
    {
        RequestUri = new Uri($"https://u48ydao1w4.execute-api.ap-southeast-2.amazonaws.com/test/downloads/thumbnails/{mediaItem.filename.S}"),
        Method = HttpMethod.Get,
        Headers = {
            { "id-token", id_Token },
            { "access-token", access_Token }
        }
    };

    async Task<string> func()
    {
        if (count == 24)
        {
            Thread.Sleep(1000);
            count = 0;
        }
        var response = await client.SendAsync(mediaRequest);
        count++;
        return await response.Content.ReadAsStringAsync();
    }
    tasks.Add(policy.ExecuteAsync(() => func()));
}

await Task.WhenAll(tasks);

foreach (var t in tasks)
{
    var postResponse = await t;
    urls.Add(postResponse);
}

【问题讨论】:

  • 在循环中添加某种计数器。在循环结束时增加计数器,如果counter % 25 == 0,等待
  • 查看我的编辑,我是否将其添加到正确的位置?

标签: c# concurrency async-await polly


【解决方案1】:

只需快速扫描代码,也许另一个类似的解决方案是添加一个 Thread.Sleep(calculatedDelay):

    foreach (var mediaItem in uploadedMedia.Items)
    {
        Thread.Sleep(calculatedDelay);
        var mediaRequest = new HttpRequestMessage

其中计算延迟是基于 1000/25 的某个值。

但是,我认为您需要一个比延迟某个指定值更好的解决方案,因为您无法确定传输数据时的开销延迟问题。此外,您没有说明当您达到 25+ 限制时会发生什么,服务器如何响应..您收到错误还是处理得更优雅?或许在这里您可以找到更可靠的解决方案?

【讨论】:

  • 是的,我同意需要一个比添加延迟更好的解决方案。服务器响应太多请求错误
【解决方案2】:

有很多方法可以做到这一点,但是编写一个简单的线程安全可重用异步速率限制器相当容易。

异步方法的优势在于,它不会阻塞线程池线程,效率相当高,并且可以在现有的异步工作流和管道(如 TPL Dataflow 和 Reactive Extensions)中很好地工作。

示例

// 3 calls every 3 seconds as an example
var rateLimiter = new RateLimiter(3, TimeSpan.FromSeconds(3));

// create some work
var task1 = Task.Run(async () =>
   {
      for (var i = 0; i < 5; i++)
      {
         await rateLimiter.WaitAsync();
         Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
      }
   }

);
var task2 = Task.Run(async () =>
   {
      for (var i = 0; i < 5; i++)
      {
         await rateLimiter.WaitAsync();
         Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
      }
   }

);
await Task.WhenAll(task1, task2);

输出

4 : 10/25/2020 05:16:15
5 : 10/25/2020 05:16:15
4 : 10/25/2020 05:16:15
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:21
5 : 10/25/2020 05:16:21
5 : 10/25/2020 05:16:21
4 : 10/25/2020 05:16:24

Full Demo Here

用法

private RateLimiter _rateLimiter = new RateLimiter(25 , TimeSpan.FromSeconds(1));

...

async Task<string> func()
{
    await _rateLimiter.WaitAsync();

    var response = await client.SendAsync(mediaRequest);
    return await response.Content.ReadAsStringAsync();
}

public class RateLimiter
{
   private readonly CancellationToken _cancellationToken;
   private readonly TimeSpan _timeSpan;
   private bool _isProcessing;
   private readonly int _count;
   private readonly Queue<DateTime> _completed = new Queue<DateTime>();
   private readonly Queue<TaskCompletionSource<bool>> _waiting = new Queue<TaskCompletionSource<bool>>();
   private readonly object _sync = new object();

   public RateLimiter(int count, TimeSpan timeSpan, CancellationToken cancellationToken = default)
   {
      _count = count;
      _timeSpan = timeSpan;
      _cancellationToken = cancellationToken;
   }

   private void Cleanup()
   {
      // if the cancellation  was request, we need to throw on all waiting items
      while (_cancellationToken.IsCancellationRequested && _waiting.Any())
         if (_waiting.TryDequeue(out var item))
            item.TrySetCanceled();

      _waiting.Clear();
      _completed.Clear();

      _isProcessing = false;
   }

   private async Task ProcessAsync()
   {
      try
      {
         while (true)
         {

            _cancellationToken.ThrowIfCancellationRequested();
            var time = DateTime.Now - _timeSpan;

            lock (_sync)
            {
               // remove anything out of date from the queue
               while (_completed.Any() && _completed.Peek() < time)
                  _completed.TryDequeue(out _);

               // signal waiting tasks to process
               while (_completed.Count < _count && _waiting.Any())
               {
                  if (_waiting.TryDequeue(out var item))
                     item.TrySetResult(true);
                  _completed.Enqueue(DateTime.Now);
               }

               if (!_waiting.Any() && !_completed.Any())
               {
                  Cleanup();
                  break;
               }
            }

            var delay = (_completed.Peek() - time) + TimeSpan.FromMilliseconds(20);

            if (delay.Ticks > 0)
               await Task.Delay(delay, _cancellationToken);
            Console.WriteLine(delay);
         }
      }
      catch (OperationCanceledException)
      {
         lock (_sync)
            Cleanup();
      }
   }

   public ValueTask WaitAsync()
   {
      // ReSharper disable once InconsistentlySynchronizedField
      _cancellationToken.ThrowIfCancellationRequested();

      lock (_sync)
      {
         try
         {
            if (_completed.Count < _count && !_waiting.Any())
            {
               _completed.Enqueue(DateTime.Now);
               return new ValueTask();
            }

            var tcs = new TaskCompletionSource<bool>();
            _waiting.Enqueue(tcs);
            return new ValueTask(tcs.Task);
         }
         finally
         {
            if (!_isProcessing)
            {
               _isProcessing = true;
               _ = ProcessAsync();
            }
         }
      }
   }
}

注意 1:最好以最大程度的并行度使用它

注意 2:虽然我对此进行了测试,但我只是为这个答案写了它作为一个新颖的解决方案

【讨论】:

  • 在看到实现之后,我不会将其描述为简单相当简单! ?
  • @TheodorZoulias 是的,它更简单!在我写之前在我的脑海里
【解决方案3】:

Polly 库当前lacks a rate-limiting policy(请求/时间)。幸运的是,使用SemaphoreSlim 实现此功能相对容易。使速率限制发生的技巧是配置信号量的容量等于被除数(请求),并在获取信号量后将信号量的Release 延迟等于除数(时间)的时间跨度.这样,速率限制将一致地应用于任何可能的时间窗口。

更新:我意识到the Polly library is extensible,并允许使用自定义功能实施自定义策略。所以我放弃了我原来的建议,转而支持下面的自定义 RateLimitAsyncPolicy 类:

public class RateLimitAsyncPolicy : AsyncPolicy
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;

    public RateLimitAsyncPolicy(int maxOperationsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _semaphore = new SemaphoreSlim(maxOperationsPerTimeUnit);
        _timeUnit = timeUnit;
    }

    protected async override Task<TResult> ImplementationAsync<TResult>(
        Func<Context, CancellationToken, Task<TResult>> action,
        Context context,
        CancellationToken cancellationToken,
        bool continueOnCapturedContext)
    {
        await _semaphore.WaitAsync(cancellationToken)
            .ConfigureAwait(continueOnCapturedContext);
        ScheduleSemaphoreRelease();
        return await action(context, cancellationToken).ConfigureAwait(false);
    }

    private async void ScheduleSemaphoreRelease()
    {
        await Task.Delay(_timeUnit);
        _semaphore.Release();
    }
}

此政策确保在timeUnit 大小的任何时间窗口内启动的操作不超过maxOperationsPerTimeUnit。不考虑操作的持续时间。换句话说,对于在任何给定时刻可以同时运行的操作数量没有限制。 BulkheadAsync 策略可以选择性地施加此限制。可以将这两种策略(RateLimitAsyncPolicyBulkheadAsync)结合起来,如下例所示:

var policy = Policy.WrapAsync
(
    Policy
        .Handle<HttpRequestException>()
        .RetryAsync(retryCount: 3),

    new RateLimitAsyncPolicy(
        maxOperationsPerTimeUnit: 25, timeUnit: TimeSpan.FromSeconds(1)),

    Policy.BulkheadAsync( // Optional
        maxParallelization: 25, maxQueuingActions: Int32.MaxValue)
);

该顺序仅对 RetryAsync 策略很重要,必须先放置,原因在 documentation 中解释:

BulkheadPolicy:通常在最里面,除非包装了最终的TimeoutPolicy。当然在任何WaitAndRetry 内。 Bulkhead 有意限制并行化。您希望并行化专门用于运行委托,而不是等待重试。

同样,RateLimitPolicy 必须跟在Retry 之后,以便每次重试都被视为独立操作,并计入速率限制。

【讨论】:

  • 回想起来,RateLimitAsyncPolicy 类有点泄漏,因为它启动了一些内部异步操作(ScheduleDelaySemaphoreRelease 方法),这些操作在最后一次使用 RateLimitAsyncPolicy 后保持活动一段时间。这不是一个大问题,但如果您创建多个 RateLimitAsyncPolicy 实例,并使用长 timeUnit 时间跨度进行初始化,则可能会成为问题。解决此问题的方法可能是使该类一次性使用,与 this 答案中的 RateLimiter 类类似。
【解决方案4】:

您应该使用 Microsoft 的响应式框架(又名 Rx)- NuGet System.Reactive 并添加 using System.Reactive.Linq; - 然后您可以这样做:

HttpRequestMessage MakeMessage(MediaItem mi) => new HttpRequestMessage
{
    RequestUri = new Uri($"https://u48ydao1w4.execute-api.ap-southeast-2.amazonaws.com/test/downloads/thumbnails/{mi.filename}"),
    Method = HttpMethod.Get,
    Headers = {
        { "id-token", id_Token },
        { "access-token", access_Token }
    }
};

var urls = await
    uploadedMedia
        .Items
        .ToObservable()
        .Buffer(24)
        .Zip(Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0)), (mrs, _) => mrs)
        .SelectMany(mrs => mrs.ToObservable().SelectMany(mr => Observable.FromAsync(() => client.SendAsync(MakeMessage(mr)))))
        .SelectMany(x => Observable.FromAsync(() => x.Content.ReadAsStringAsync()))
        .ToList();

我无法测试它,但应该相当接近。

【讨论】:

    猜你喜欢
    • 2021-07-17
    • 1970-01-01
    • 2018-04-18
    • 2021-02-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多