【发布时间】:2020-06-10 17:38:55
【问题描述】:
我正在编写代码来处理来自设备的事件,其中所述事件可能以突发的方式发生,常见的情况,或者在错误的情况下可能非常频繁地发生。
我想要的行为是:
- 在给定的时间窗口内,切勿多次转发事件。 (1 分钟)
- 一般情况下,不要拆分事件突发(通常为几秒),等待静默期后再转发
- 在错误情况下,如果正在生成事件但在 2 倍时间窗口内没有转发任何事件,则转发最后一个事件
我发现 Throttle 通过等待一段时间然后发送最后一个事件来按我想要的方式工作。但是,如果事件是垃圾邮件,则 Throttle 永远不会转发任何事件,因为静默期窗口会反复重置。
我发现 Sample 效果很好,除非突发发生在时间窗口的末尾,因为我对突发中间发生的事件不感兴趣。
我知道这很可能可以通过使用 Switch 或 Join 来解决,但我还没有找到一个与我的场景足够接近的示例来让我点击它。
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Reactive.Concurrency;
using System.Diagnostics;
public class Program
{
public static event EventHandler<FakeEventArgs> DeviceChange;
public static TimeSpan window = TimeSpan.FromSeconds(60);
public static uint eventCounter = 0;
public static async Task Main()
{
var window2x = window + window;
//Just to give a visual sense of when things are happening
Observable.Interval(window).Subscribe(iterator => Console.WriteLine($"Non-sliding Window {iterator}"));
//Create observable from standard event in order to use Rx.
var eventsAsObservables = Observable.FromEventPattern<FakeEventArgs>
(
handler => DeviceChange += handler,
handler => DeviceChange -= handler
);
//pure throttle doesn't work in the case where events always firing faster than the time window (i.e. device with faulty connection)
//eventsAsObservables
// .Throttle(window)
// .ObserveOn(ThreadPoolScheduler.Instance)
// .Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay} Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });
//pure sample doesn't work in the case where clusters of events are happening across the time window boundary (i.e. device unplugged right at time window)
//eventsAsObservables
// .Sample(window)
// .ObserveOn(ThreadPoolScheduler.Instance)
// .Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay} Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });
var throttled = eventsAsObservables.Throttle(window);
var sampled = eventsAsObservables.Sample(window2x);
//plain merge will forward extra events to subscribers
//throttled
// .Merge(sampled)
// .ObserveOn(ThreadPoolScheduler.Instance)
// .Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay} Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });
//How do I alter this to get the desired behavior?
throttled
.Select(selector => sampled)
.Switch()
.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay} Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });
Console.WriteLine($"About to start raising events {DateTime.Now}");
//RaiseEvent($"{++eventCounter}");
//These events occur very frequently
//They cause Throttle to never forward anything because the quiet timer gets reset
StartSpammyEventsAsync(100);
//These events will burst on the time boundary
//Causes Throttle to never forward event because the quiet timer gets reset just before it expires
//Causes Sample to forward event from the middle of the burst instead of the end
StartBurstyEventsAsync(window);
Console.WriteLine("\nPress ENTER to exit...\n");
Console.ReadLine();
}
static void RaiseEvent(string eventedMessage) =>
DeviceChange?.Invoke(null, new FakeEventArgs(eventedMessage));
static async Task StartSpammyEventsAsync(int milliSeconds)
{
while (true)
{
await Task.Delay(milliSeconds).ConfigureAwait(false);
Debug.WriteLine($"Raising event {eventCounter}");
RaiseEvent($"{++eventCounter}");
}
}
static async Task StartBurstyEventsAsync(TimeSpan window)
{
while (true)
{
await Task.Delay(window - TimeSpan.FromSeconds(1)).ConfigureAwait(false);
//two second burst of events
var start = DateTime.Now;
var limit = TimeSpan.FromSeconds(2);
while (DateTime.Now - start < limit)
{
await Task.Delay(100).ConfigureAwait(false);
Debug.WriteLine($"Raising event {eventCounter}");
RaiseEvent($"{++eventCounter}");
}
}
}
public class FakeEventArgs : EventArgs
{
public readonly string Message;
public readonly DateTime Created;
protected FakeEventArgs() { }
public FakeEventArgs(string message):
base()
{
Created = DateTime.Now;
Message = message;
}
}
}
【问题讨论】:
标签: c# system.reactive