【发布时间】:2022-10-02 13:27:14
【问题描述】:
我有一个 RabbitMQ 队列,里面装满了数千条消息。我需要我的消费者每秒消费 1 条消息,所以我使用 Polly 实现了 RateLimit 策略。我的配置如下:
public static IAsyncPolicy GetPolicy(int mps)
{
if (mps <= 0)
{
throw new ArgumentOutOfRangeException(nameof(mps));
}
return Policy
.HandleResult<HttpResponseMessage>(result => {
return result.StatusCode == System.Net.HttpStatusCode.TooManyRequests;
})
.Or<Polly.RateLimit.RateLimitRejectedException>()
.WaitAndRetryForeverAsync((retryNum, context) => {
Console.WriteLine($\"Retrying. Num: {retryNum}\");
return TimeSpan.FromSeconds(1);
}).WrapAsync(
Policy.RateLimitAsync(mps, TimeSpan.FromSeconds(1)));
}
其中mps 是 1
现在我注意到以下内容:
- 一开始,我的队列中消耗了 50 条消息,时间跨度为 1 秒。 RateLimiter 看起来不起作用
- 然后,每秒消耗一条消息,
WaitAndRetryForeverAsync执行多次(数十次)
如果我将 mps 设置为 50,则会发生以下情况:
- 在开始时立即消耗 50 条消息
- 然后每秒消耗 20 条消息(而不是预期的 50 条)
Policy.RateLimitAsync 调用是否存在错误?
难道我做错了什么?
-
你想用
WaitAndRetryForeverAsync实现什么? -
它实际上是: .WaitAndRetryForeverAsync((retryNum, context) => { Console.WriteLine($\"Retrying.Num: {retryNum}\"); return TimeSpan.FromSeconds(2); }) 我在等待在尝试执行我的“RabbitMQ Consumption”之前等待 2 秒
-
您的消费者是单线程还是多线程?
-
我不知道。它是 RabbitMQ 队列的标准“EasyNetQ”消费者。我认为它是多线程的。 (我几乎可以肯定它是)
-
好的,期望的目标是什么?
mps消息数量/线程或mps消息数量/“消费者组”?您想在消费者线程之间共享此策略吗?
标签: c# rate-limiting polly retry-logic