【问题标题】:How can I use Reactive Extensions to throttle Events using a max window size?如何使用 Reactive Extensions 来限制使用最大窗口大小的事件?
【发布时间】:2013-11-30 19:37:15
【问题描述】:

场景

我正在构建一个 UI 应用程序,它每隔几毫秒从后端服务获取通知。收到新通知后,我想尽快更新 UI。

由于我可以在短时间内收到大量通知,而且我总是只关心最新的事件,因此我使用了响应式扩展框架的 Throttle() 方法。这使我可以忽略紧跟新通知的通知事件,因此我的 UI 保持响应。

问题:

假设我将通知事件的事件流限制为 50 毫秒,并且后端每 10 毫秒发送一次通知,Thottle() 方法将永远不会返回事件,因为它会一次又一次地重置其滑动窗口。在这里,我需要一些额外的行为来指定诸如超时之类的东西,以便在事件吞吐量如此之高的情况下,我可以每秒至少检索一个事件左右。我如何使用响应式扩展来做到这一点?

【问题讨论】:

    标签: c# system.reactive throttling


    【解决方案1】:

    正如 James 所说,Observable.Sample 将为您提供最新的收益。但是,它将在计时器上执行此操作,而不是根据油门中的第一个事件发生的时间。然而,更重要的是,如果您的采样时间很长(例如 10 秒),并且您的事件在采样后立即触发,那么您将在将近 10 秒内不会收到该新事件。

    如果您需要更紧凑的东西,则需要实现自己的功能。我冒昧地这样做了。这段代码肯定可以进行一些清理,但我相信它可以满足您的要求。

    public static class ObservableEx
    {
        public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime)
        {
            return source.ThrottleMax(dueTime, maxTime, Scheduler.Default);
        }
    
        public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime, IScheduler scheduler)
        {
            return Observable.Create<T>(o =>
            {
                var hasValue = false;
                T value = default(T);
    
                var maxTimeDisposable = new SerialDisposable();
                var dueTimeDisposable = new SerialDisposable();
    
                Action action = () =>
                {
                    if (hasValue)
                    {
                        maxTimeDisposable.Disposable = Disposable.Empty;
                        dueTimeDisposable.Disposable = Disposable.Empty;
                        o.OnNext(value);
                        hasValue = false;
                    }
                };
    
                return source.Subscribe(
                    x =>
                    {
                        if (!hasValue)
                        {
                            maxTimeDisposable.Disposable = scheduler.Schedule(maxTime, action);
                        }
    
                        hasValue = true;
                        value = x;
                        dueTimeDisposable.Disposable = scheduler.Schedule(dueTime, action);
                    },
                    o.OnError,
                    o.OnCompleted
                );
            });
        }
    }
    

    还有一些测试...

    [TestClass]
    public class ThrottleMaxTests : ReactiveTest
    {
        [TestMethod]
        public void CanThrottle()
        {
    
            var scheduler = new TestScheduler();
            var results = scheduler.CreateObserver<int>();
    
            var source = scheduler.CreateColdObservable(
                OnNext(100, 1)
                );
    
            var dueTime = TimeSpan.FromTicks(100);
            var maxTime = TimeSpan.FromTicks(250);
    
            source.ThrottleMax(dueTime, maxTime, scheduler)
                .Subscribe(results);
    
            scheduler.AdvanceTo(1000);
    
            results.Messages.AssertEqual(
                OnNext(200, 1)
                );
        }
    
        [TestMethod]
        public void CanThrottleWithMaximumInterval()
        {
    
            var scheduler = new TestScheduler();
            var results = scheduler.CreateObserver<int>();
    
            var source = scheduler.CreateColdObservable(
                OnNext(100, 1),
                OnNext(175, 2),
                OnNext(250, 3),
                OnNext(325, 4),
                OnNext(400, 5)
                );
    
            var dueTime = TimeSpan.FromTicks(100);
            var maxTime = TimeSpan.FromTicks(250);
    
            source.ThrottleMax(dueTime, maxTime, scheduler)
                .Subscribe(results);
    
            scheduler.AdvanceTo(1000);
    
            results.Messages.AssertEqual(
                OnNext(350, 4),
                OnNext(500, 5)
                );
        }
    
        [TestMethod]
        public void CanThrottleWithoutMaximumIntervalInterferance()
        {
            var scheduler = new TestScheduler();
            var results = scheduler.CreateObserver<int>();
    
            var source = scheduler.CreateColdObservable(
                OnNext(100, 1),
                OnNext(325, 2)
                );
    
            var dueTime = TimeSpan.FromTicks(100);
            var maxTime = TimeSpan.FromTicks(250);
    
            source.ThrottleMax(dueTime, maxTime, scheduler)
                .Subscribe(results);
    
            scheduler.AdvanceTo(1000);
    
            results.Messages.AssertEqual(
                OnNext(200, 1),
                OnNext(425, 2)
                );
        }
    }
    

    【讨论】:

    • 有趣。我当然可以看到最大油门持续时间很有用。
    • OnNextTestScheduler 的代码丢失。我知道这是一个非常古老的答案,但你还有吗?
    • 我进行了快速搜索并找到了这个,但是自从我使用 C# 以来已经有一段时间了,而且事情变化很快。 VS intellisense/resharper 或 VS Code C# 插件是否不建议导入? github.com/dotnet/reactive/blob/master/Rx.NET/Source/src/… 示例用法:github.com/dotnet/reactive/blob/…
    【解决方案2】:

    不要使用 Observable.Throttle,像这样使用 Observable.Sample,其中 TimeSpan 给出了所需的更新之间的最小间隔:

    source.Sample(TimeSpan.FromMilliseconds(50))
    

    【讨论】:

    • 如果您确实想了解如何限制限制,请在此处查看我的答案:stackoverflow.com/questions/19996205/…
    • 您也可以添加.DistinctUntilChanged(),因为这将防止同一事件被一遍又一遍地“采样”,这将更好地匹配 Throttle,因为它不会多次产生相同的事件. source.Sample(TimeSpan.FromMilliseconds(50)).DistinctUntilChanged()
    • 但是,如果您在没有新事件发生时几乎立即需要事件,但 不能等待超过指定的时间,则此解决方案将不起作用事件发生后的时间段。这可能需要特别关注...
    • 我认为考虑到每秒更新 UI 的场景,这可能没问题 ;) - .Throttle 当然也增加了一些延迟。
    • @Chris Sample 不会重新发出 same 事件,如果在一段时间内没有发生任何事情顺便说一句,所以 DistinctUntilChanged 仅在您想消除连续值时才需要- 平等的事件。还要注意将它放在Sample 之前或之后的重要区别。
    猜你喜欢
    • 2013-03-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-24
    • 1970-01-01
    • 1970-01-01
    • 2021-08-29
    • 1970-01-01
    相关资源
    最近更新 更多