【问题标题】:How to limit number of HttpWebRequest per second towards a webserver?如何限制每秒对网络服务器的 HttpWebRequest 数量?
【发布时间】:2012-05-02 16:38:19
【问题描述】:

在使用 HttpWebRequest 向一个应用程序服务器发出并行请求时,我需要实现一种限制机制(每秒请求数)。我的 C# 应用程序必须每秒向远程服务器发出不超过 80 个请求。远程服务管理员施加的限制不是硬限制,而是我的平台和他们的平台之间的“SLA”。

使用HttpWebRequest时如何控制每秒的请求数?

【问题讨论】:

    标签: c# httpwebrequest


    【解决方案1】:

    我遇到了同样的问题,但找不到现成的解决方案,所以我做了一个,就在这里。这个想法是使用BlockingCollection<T> 添加需要处理的项目,并使用响应式扩展来订阅速率受限的处理器。

    油门类是this rate limiter的重命名版

    public static class BlockingCollectionExtensions
    {
        // TODO: devise a way to avoid problems if collection gets too big (produced faster than consumed)
        public static IObservable<T> AsRateLimitedObservable<T>(this BlockingCollection<T> sequence, int items, TimeSpan timePeriod, CancellationToken producerToken)
        {
            Subject<T> subject = new Subject<T>();
    
            // this is a dummyToken just so we can recreate the TokenSource
            // which we will pass the proxy class so it can cancel the task
            // on disposal
            CancellationToken dummyToken = new CancellationToken();
            CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(producerToken, dummyToken);
    
            var consumingTask = new Task(() =>
            {
                using (var throttle = new Throttle(items, timePeriod))
                {
                    while (!sequence.IsCompleted)
                    {
                        try
                        {
                            T item = sequence.Take(producerToken);
                            throttle.WaitToProceed();
                            try
                            {
                                subject.OnNext(item);
                            }
                            catch (Exception ex)
                            {
                                subject.OnError(ex);
                            }
                        }
                        catch (OperationCanceledException)
                        {
                            break;
                        }
                    }
                    subject.OnCompleted();
                }
            }, TaskCreationOptions.LongRunning);
    
            return new TaskAwareObservable<T>(subject, consumingTask, tokenSource);
        }
    
        private class TaskAwareObservable<T> : IObservable<T>, IDisposable
        {
            private readonly Task task;
            private readonly Subject<T> subject;
            private readonly CancellationTokenSource taskCancellationTokenSource;
    
            public TaskAwareObservable(Subject<T> subject, Task task, CancellationTokenSource tokenSource)
            {
                this.task = task;
                this.subject = subject;
                this.taskCancellationTokenSource = tokenSource;
            }
    
            public IDisposable Subscribe(IObserver<T> observer)
            {
                var disposable = subject.Subscribe(observer);
                if (task.Status == TaskStatus.Created)
                    task.Start();
                return disposable;
            }
    
            public void Dispose()
            {
                // cancel consumption and wait task to finish
                taskCancellationTokenSource.Cancel();
                task.Wait();
    
                // dispose tokenSource and task
                taskCancellationTokenSource.Dispose();
                task.Dispose();
    
                // dispose subject
                subject.Dispose();
            }
        }
    }
    

    单元测试:

    class BlockCollectionExtensionsTest
    {
        [Fact]
        public void AsRateLimitedObservable()
        {
            const int maxItems = 1; // fix this to 1 to ease testing
            TimeSpan during = TimeSpan.FromSeconds(1);
    
            // populate collection
            int[] items = new[] { 1, 2, 3, 4 };
            BlockingCollection<int> collection = new BlockingCollection<int>();
            foreach (var i in items) collection.Add(i);
            collection.CompleteAdding();
    
            IObservable<int> observable = collection.AsRateLimitedObservable(maxItems, during, CancellationToken.None);
            BlockingCollection<int> processedItems = new BlockingCollection<int>();
            ManualResetEvent completed = new ManualResetEvent(false);
            DateTime last = DateTime.UtcNow;
            observable
                // this is so we'll receive exceptions
                .ObserveOn(new SynchronizationContext()) 
                .Subscribe(item =>
                    {
                        if (item == 1)
                            last = DateTime.UtcNow;
                        else
                        {
                            TimeSpan diff = (DateTime.UtcNow - last);
                            last = DateTime.UtcNow;
    
                            Assert.InRange(diff.TotalMilliseconds,
                                during.TotalMilliseconds - 30,
                                during.TotalMilliseconds + 30);
                        }
                        processedItems.Add(item);
                    },
                    () => completed.Set()
                );
            completed.WaitOne();
            Assert.Equal(items, processedItems, new CollectionEqualityComparer<int>());
        }
    }
    

    【讨论】:

    【解决方案2】:

    Throttle() 和 Sample() 扩展方法(On Observable)允许您将快速的事件序列调整为“较慢”的序列。

    Sample(Timespan) 中的Here is a blog post with an example 可确保最大速率。

    【讨论】:

    • Sample() 和 Throttle() 的问题是它们跳过/丢弃样本以达到指定的速率。
    【解决方案3】:

    我最初的帖子讨论了如何通过客户端行为扩展向 WCF 添加节流机制,但后来有人指出我误读了这个问题(doh!)。

    总体而言,方法可以是检查一个确定我们是否违反速率限制的类。已经有很多关于如何检查违规率的讨论。

    Throttling method calls to M requests in N seconds

    如果您违反了速率限制,请休眠一段时间并再次检查。如果没有,请继续进行 HttpWebRequest 调用。

    【讨论】:

    • 在这个问题中,我指的不是 WCF Web 服务。它是关于一个简单的 HttpWebRequest 类的用法。
    • 啊,太晚了,我应该更仔细地阅读问题:) 您仍然可以尝试在调用 HttpWebRequest 之前的方法,检查另一个类以确保您不会违反 80 个请求/秒速率。我将在上面更新我的代码。
    • 它要求的是 C# 而不是 Java。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-07-31
    • 2017-03-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多