【问题标题】:How do I use Rx.Net to handle events that may be bursty or spammy?如何使用 Rx.Net 处理可能是突发或垃圾邮件的事件?
【发布时间】:2020-06-10 17:38:55
【问题描述】:

我正在编写代码来处理来自设备的事件,其中所述事件可能以突发的方式发生,常见的情况,或者在错误的情况下可能非常频繁地发生。

我想要的行为是:

  • 在给定的时间窗口内,切勿多次转发事件。 (1 分钟)
  • 一般情况下,不要拆分事件突发(通常为几秒),等待静默期后再转发
  • 在错误情况下,如果正在生成事件但在 2 倍时间窗口内没有转发任何事件,则转发最后一个事件

我发现 Throttle 通过等待一段时间然后发送最后一个事件来按我想要的方式工作。但是,如果事件是垃圾邮件,则 Throttle 永远不会转发任何事件,因为静默期窗口会反复重置。

我发现 Sample 效果很好,除非突发发生在时间窗口的末尾,因为我对突发中间发生的事件不感兴趣。

我知道这很可能可以通过使用 Switch 或 Join 来解决,但我还没有找到一个与我的场景足够接近的示例来让我点击它。

using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Reactive.Concurrency;
using System.Diagnostics;

public class Program
{
    public static event EventHandler<FakeEventArgs> DeviceChange;
    public static TimeSpan window = TimeSpan.FromSeconds(60);
    public static uint eventCounter = 0;

    public static async Task Main()
    {
        var window2x = window + window;
        //Just to give a visual sense of when things are happening
        Observable.Interval(window).Subscribe(iterator => Console.WriteLine($"Non-sliding Window {iterator}"));

        //Create observable from standard event in order to use Rx.
        var eventsAsObservables = Observable.FromEventPattern<FakeEventArgs>
            (
                handler => DeviceChange += handler,
                handler => DeviceChange -= handler
            );

        //pure throttle doesn't work in the case where events always firing faster than the time window (i.e. device with faulty connection)
        //eventsAsObservables
        //  .Throttle(window)
        //  .ObserveOn(ThreadPoolScheduler.Instance)
        //  .Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay}  Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });

        //pure sample doesn't work in the case where clusters of events are happening across the time window boundary (i.e. device unplugged right at time window)
        //eventsAsObservables
        //  .Sample(window)
        //  .ObserveOn(ThreadPoolScheduler.Instance)
        //  .Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay}  Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });

        var throttled = eventsAsObservables.Throttle(window);
        var sampled = eventsAsObservables.Sample(window2x);

        //plain merge will forward extra events to subscribers
        //throttled
        //  .Merge(sampled)
        //  .ObserveOn(ThreadPoolScheduler.Instance)
        //  .Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay}  Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });

        //How do I alter this to get the desired behavior?
        throttled
            .Select(selector => sampled)
            .Switch()
            .ObserveOn(ThreadPoolScheduler.Instance)
            .Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay}  Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });

        Console.WriteLine($"About to start raising events {DateTime.Now}");
        //RaiseEvent($"{++eventCounter}");

        //These events occur very frequently
        //They cause Throttle to never forward anything because the quiet timer gets reset
        StartSpammyEventsAsync(100);

        //These events will burst on the time boundary 
        //Causes Throttle to never forward event because the quiet timer gets reset just before it expires
        //Causes Sample to forward event from the middle of the burst instead of the end
        StartBurstyEventsAsync(window);

        Console.WriteLine("\nPress ENTER to exit...\n");
        Console.ReadLine();
    }

    static void RaiseEvent(string eventedMessage) =>
        DeviceChange?.Invoke(null, new FakeEventArgs(eventedMessage));

    static async Task StartSpammyEventsAsync(int milliSeconds)
    {
        while (true)
        {
            await Task.Delay(milliSeconds).ConfigureAwait(false);
            Debug.WriteLine($"Raising event {eventCounter}");
            RaiseEvent($"{++eventCounter}");
        }
    }

    static async Task StartBurstyEventsAsync(TimeSpan window)
    {
        while (true)
        {
            await Task.Delay(window - TimeSpan.FromSeconds(1)).ConfigureAwait(false);

            //two second burst of events
            var start = DateTime.Now;
            var limit = TimeSpan.FromSeconds(2);
            while (DateTime.Now - start < limit)
            {
                await Task.Delay(100).ConfigureAwait(false);
                Debug.WriteLine($"Raising event {eventCounter}");
                RaiseEvent($"{++eventCounter}");
            }
        }
    }

    public class FakeEventArgs : EventArgs
    {
        public readonly string Message;
        public readonly DateTime Created;

        protected FakeEventArgs() { }

        public FakeEventArgs(string message):
            base()
        {
            Created = DateTime.Now;
            Message = message;
        }
    }
} 

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    混合这两种方法怎么样?这是一个ThrottleUntil 扩展方法,它使用Throttle 等待“突发”结束,Interval 在“突发”未结束时发出项目(也称为“垃圾邮件”):

    public static IObservable<T> ThrottleUntil<T>(this IObservable<T> source, TimeSpan window, IScheduler scheduler)
    {
        var throttle = source.Throttle(window, scheduler);
        var until = Observable.Interval(window * 2, scheduler).Select(_ => default(T));
    
        return source
            .Buffer(() => Observable.Merge(throttle, until).Take(1))
            .SelectMany(buffer => buffer.Any() ? buffer.TakeLast(1) : Enumerable.Empty<T>());
    }
    

    这里有一些测试用例(大理石图对于表达你想要发生的事情非常有用)展示了它是如何工作的:

    [TestFixture]
    public class BurstySpammy
    {
        private static long SubscriptionOffset = ReactiveTest.Subscribed;
        private static long TestOffset = ReactiveTest.Created + ReactiveTest.Subscribed;
    
        private static IEnumerable<TestCaseData> TestCases
        {
            get
            {
                // Time:     0---1---2---3---4---5---6---7---8---9---0---1---2---3---4---5---6---7---8---9
                // Source:   ----1---2---------------3---4-------------------5---------------------------- 
                // Expected: --------------------2-------------------4-----------------------5------------
                yield return new TestCaseData(new int[] { 1, 2, 6, 7, 13 }, new[] { 5, 10, 16 }, false).SetName("ShouldEmitFromBurst");
    
                // Time:     0---1---2---3---4---5---6---7---8---9---0---1---2---3---4---5---6---7---8---9
                // Source:   ----1-------2-------3-------4-------5-------6-------------------------------- 
                // Expected: ------------------------3-----------------------6----------------------------
                yield return new TestCaseData(new int[] { 1, 3, 5, 7, 9, 11 }, new[] { 6, 12 }, true).SetName("ShouldEmitFromSpam");
    
                // Time:     0---1---2---3---4---5---6---7---8---9---0---1---2---3---4---5---6---7---8---9
                // Source:   ----1---2---------------3---4---5-------6-------7---8---9-------------------- 
                // Expected: --------------------2-----------------------7-----------------------9--------
                yield return new TestCaseData(new int[] { 1, 2, 6, 7, 8, 10, 12, 13, 14 }, new[] { 5, 11, 17 }, false).SetName("ShouldEmitFromMix");
    
                // Time:     0---1---2---3---4---5---6---7---8---9---0---1---2---3---4---5---6---7---8---9
                // Source:   ----1------------------------------------------------------------------------ 
                // Expected: ----------------1------------------------------------------------------------
                yield return new TestCaseData(new int[] { 1 }, new[] { 4 }, false).SetName("ShouldNotEmitEmpty");
            }
        }
    
        [TestCaseSource(nameof(TestCases))]
        public void ShouldEmitCorrectly(int[] sourceTimes, int[] expectedTimes, bool basedOnSubscriptionTime)
        {
            var scheduler = new TestScheduler();
    
            var source = sourceTimes
                .Select((time, index) => new Recorded<Notification<int>>(TimeSpan.FromSeconds(time).Ticks, Notification.CreateOnNext(index)))
                .ToArray();
    
            var expected = expectedTimes
                .Select(time => (Time: basedOnSubscriptionTime ? TimeSpan.FromSeconds(time).Ticks + SubscriptionOffset : TimeSpan.FromSeconds(time).Ticks, Value: source.Last(r => r.Time <= TimeSpan.FromSeconds(time).Ticks).Value.Value))
                .Select(tuple => new Recorded<Notification<int>>(tuple.Time, Notification.CreateOnNext(tuple.Value)))
                .ToArray();
    
            var xs = scheduler
                .CreateHotObservable(source)
                .ThrottleUntil(TimeSpan.FromSeconds(3), scheduler);
    
            var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(20).Ticks + TestOffset);
    
            CollectionAssert.AreEqual(expected, observed.Messages);
        }
    }
    

    希望对你有帮助。

    【讨论】:

    • 你的扩展没有为我编译好。它不喜欢 buffer.TakeLast(1) 因为 buffer 不是直接可观察的。这是否捕获了意图: public static IObservable ThrottleUntil(this IObservable source, TimeSpan window, IScheduler scheduler) { var throttle = source.Throttle(window, scheduler); var until = Observable.Interval(window + window, scheduler).Select(_ => default(T)); return source .Buffer(() => Observable.Merge(throttle, until).Take(1)) .SelectMany(buffer => buffer.Any() ? new List { buffer.Last() } :可枚举。空()); }
    • .TakeLast(x) 是来自System.Linq 的扩展方法。将此命名空间的 using 子句添加到您的代码中。
    • 我有 System.Linq 和 System.Reactive.Linq。 TakeLast 不采用 IList ,这是 SelectMany 中的缓冲区
    猜你喜欢
    • 2013-08-06
    • 1970-01-01
    • 2016-11-07
    • 1970-01-01
    • 1970-01-01
    • 2011-01-24
    • 1970-01-01
    • 2011-04-06
    • 2019-04-11
    相关资源
    最近更新 更多