【问题标题】:RateLimiting - Incorrect limitingRateLimiting - 不正确的限制
【发布时间】:2022-10-02 13:27:14
【问题描述】:

我有一个 RabbitMQ 队列,里面装满了数千条消息。我需要我的消费者每秒消费 1 条消息,所以我使用 Polly 实现了 RateLimit 策略。我的配置如下:

public static IAsyncPolicy GetPolicy(int mps)
{
    if (mps <= 0)
    {
        throw new ArgumentOutOfRangeException(nameof(mps));
    }
    
    return Policy
        .HandleResult<HttpResponseMessage>(result => {
            return result.StatusCode == System.Net.HttpStatusCode.TooManyRequests;
        })
        .Or<Polly.RateLimit.RateLimitRejectedException>()
        .WaitAndRetryForeverAsync((retryNum, context) => {
            Console.WriteLine($\"Retrying. Num: {retryNum}\");
            return TimeSpan.FromSeconds(1);
        }).WrapAsync(
            Policy.RateLimitAsync(mps, TimeSpan.FromSeconds(1)));
}

其中mps 是 1

现在我注意到以下内容:

  • 一开始,我的队列中消耗了 50 条消息,时间跨度为 1 秒。 RateLimiter 看起来不起作用
  • 然后,每秒消耗一条消息,WaitAndRetryForeverAsync 执行多次(数十次)

如果我将 mps 设置为 50,则会发生以下情况:

  • 在开始时立即消耗 50 条消息
  • 然后每秒消耗 20 条消息(而不是预期的 50 条)

Policy.RateLimitAsync 调用是否存在错误?
难道我做错了什么?

  • 你想用WaitAndRetryForeverAsync 实现什么?
  • 它实际上是: .WaitAndRetryForeverAsync((retryNum, context) => { Console.WriteLine($\"Retrying.Num: {retryNum}\"); return TimeSpan.FromSeconds(2); }) 我在等待在尝试执行我的“RabbitMQ Consumption”之前等待 2 秒
  • 您的消费者是单线程还是多线程?
  • 我不知道。它是 RabbitMQ 队列的标准“EasyNetQ”消费者。我认为它是多线程的。 (我几乎可以肯定它是)
  • 好的,期望的目标是什么? mps 消息数量/线程或mps 消息数量/“消费者组”?您想在消费者线程之间共享此策略吗?

标签: c# rate-limiting polly retry-logic


【解决方案1】:

我必须在这里强调,在撰写本文时,限速器政策被认为是相当新的。它从 7.2.3 开始可用,这是当前的稳定版本。因此,它不像其他政策那样成熟。


基于its documentation,我认为还不清楚how does it really work

让我通过一个简单的例子告诉你我的意思

var localQueue = new Queue<int>();
for (int i = 0; i < 1000; i++)
{
    localQueue.Enqueue(i);
}

RateLimitPolicy rateLimiter = Policy
    .RateLimit(20, TimeSpan.FromSeconds(1));

while (localQueue.TryPeek(out _))
{
    rateLimiter.Execute(() =>
    {
        Console.WriteLine(localQueue.Dequeue());
        Thread.Sleep(10);
    });
}

如果你运行这个程序,它将打印01,然后它会以RateLimitRejectedException 崩溃。

  • 为什么?
  • 为什么不打印前 20 个然后崩溃?

答案是政策的定义方式可以防止虐待.我们在两次操作之间只等待 10 毫秒。它被认为是一个虐待从政策角度。

所以,没有这个防止滥用我们将在 200 毫秒内消耗允许的带宽,并且在剩余的 800 毫秒内我们无法执行任何进一步的操作。

将睡眠时间更改为 49

结果将或多或少相同

  • 它可能可以达到 1 以外的其他数字,但肯定不能达到 20

将睡眠持续时间更改为 50

它可以愉快地消耗整个队列而不会停止

为什么?因为 1000 毫秒 / 20 允许执行 = 50 毫秒令牌

这意味着您允许的执行是分布式的均匀地随着时间的推移。

允许突发

让我们玩一下突发模式。

让我们将 maxBurst 设置为 2,将 sleep 设置为 49

RateLimitPolicy rateLimiter = Policy
    .RateLimit(20, TimeSpan.FromSeconds(1), 2);

while (localQueue.TryPeek(out _))
{
    rateLimiter.Execute(() =>
    {
        Console.WriteLine(localQueue.Dequeue());
        Thread.Sleep(49);
    });
}

在这种模式下,将在 10-20 之间抛出异常。 (多次运行)

让我们将 maxBurst 更改为 4。它将在 20-60 之间崩溃...


结论

因此,速率限制器不能像you might expect 那样工作。

【讨论】:

    【解决方案2】:

    我找到了问题的答案(感谢Polly's GitHub)。正确的配置如下:

    public static IAsyncPolicy<HttpResponseMessage> GetPolicy(int mps)
    {
        if (mps <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(mps));
        }
    
        IAsyncPolicy<HttpResponseMessage> limit = Policy
            .RateLimitAsync(mps, TimeSpan.FromSeconds(1), mps,
                (retryAfter, context) => {
                    var response = new HttpResponseMessage(System.Net.HttpStatusCode.TooManyRequests);
                    response.Headers.Add("Retry-After", retryAfter.Milliseconds.ToString());
                    return response;
                });
    
        IAsyncPolicy<HttpResponseMessage> retry = Policy
            .HandleResult<HttpResponseMessage>(result => result.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
            .Or<Polly.RateLimit.RateLimitRejectedException>()
            .WaitAndRetryForeverAsync((retryNum) => {
                return TimeSpan.FromSeconds(1);
            });
    
        var resilienceStrategy = Policy.WrapAsync(retry, limit);
    
        return resilienceStrategy;
    }
    

    【讨论】:

      猜你喜欢
      • 2022-06-11
      • 1970-01-01
      • 2010-11-18
      • 1970-01-01
      • 2011-12-18
      • 1970-01-01
      • 2021-06-23
      • 2016-02-02
      • 2020-05-01
      相关资源
      最近更新 更多