这是一个RateLimiter 类,您可以使用它来限制异步操作的频率。它是RateLimiter 类的更简单实现,可在this 答案中找到。
/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter : IDisposable
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
private readonly CancellationTokenSource _disposeCts;
private readonly CancellationToken _disposeToken;
private bool _disposed;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
_disposeCts = new CancellationTokenSource();
_disposeToken = _disposeCts.Token;
}
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
ScheduleSemaphoreRelease();
}
private async void ScheduleSemaphoreRelease()
{
try { await Task.Delay(_timeUnit, _disposeToken).ConfigureAwait(false); }
catch (OperationCanceledException) { } // Ignore
lock (_semaphore) { if (!_disposed) _semaphore.Release(); }
}
/// <summary>Call Dispose when you are finished using the RateLimiter.</summary>
public void Dispose()
{
lock (_semaphore)
{
if (_disposed) return;
_semaphore.Dispose();
_disposed = true;
_disposeCts.Cancel();
_disposeCts.Dispose();
}
}
}
使用示例:
List<string> urls = GetUrls();
using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));
string[] documents = await Task.WhenAll(urls.Select(async url =>
{
await rateLimiter.WaitAsync();
return await _httpClient.GetStringAsync(url);
}));
注意:我添加了一个Dispose方法,这样可以取消RateLimiter类内部发起的异步操作。这
使用完RateLimiter 后应该调用方法,否则挂起的异步操作将阻止
除了消耗与活动的Task.Delay 任务相关的资源之外,RateLimiter 还会被及时的垃圾收集。
原始的非常简单但有漏洞的实现可以在这个答案的2nd revision 中找到。
我正在添加一个更复杂的RateLimiter 类的替代实现,它基于Stopwatch 而不是SemaphoreSlim。它的优点是不需要一次性的,因为它不会在后台启动隐藏的异步操作。缺点是WaitAsync 方法不支持CancellationToken 参数,并且由于复杂性,出现错误的概率更高。
public class RateLimiter
{
private readonly Stopwatch _stopwatch;
private readonly Queue<TimeSpan> _queue;
private readonly int _maxActionsPerTimeUnit;
private readonly TimeSpan _timeUnit;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
// Arguments validation omitted
_stopwatch = Stopwatch.StartNew();
_queue = new Queue<TimeSpan>();
_maxActionsPerTimeUnit = maxActionsPerTimeUnit;
_timeUnit = timeUnit;
}
public Task WaitAsync()
{
var delay = TimeSpan.Zero;
lock (_stopwatch)
{
var currentTimestamp = _stopwatch.Elapsed;
while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
{
_queue.Dequeue();
}
if (_queue.Count >= _maxActionsPerTimeUnit)
{
var refTimestamp = _queue
.Skip(_queue.Count - _maxActionsPerTimeUnit).First();
delay = refTimestamp - currentTimestamp;
Debug.Assert(delay >= TimeSpan.Zero);
if (delay < TimeSpan.Zero) delay = TimeSpan.Zero; // Just in case
}
_queue.Enqueue(currentTimestamp + delay + _timeUnit);
}
if (delay == TimeSpan.Zero) return Task.CompletedTask;
return Task.Delay(delay);
}
}