【问题标题】:Enumerating events occuring in time using reactive extensions (rx)使用响应式扩展 (rx) 枚举及时发生的事件
【发布时间】:2014-05-09 19:06:08
【问题描述】:
 public interface Event
 {
      Guid identifier;
      Timestamp ts;
 }

我们正在考虑使用响应式扩展来重写我的金融公司的问题。

前提是我们获得由 Guid(股票代码 + 嵌入其中的唯一性熵)、时间戳和 Value 字段标识的事件。这些以很高的速度出现,我们不能对这些对象采取行动,直到“至少”在 X 秒(10 秒)之后,我们必须对它们采取行动,并将它们从系统中删除。

把它想象成两个窗口,一个“10 秒”的初始窗口(例如 T0 到 T10),我们在其中识别所有唯一事件(基本上,按 guid 分组),然后我们查看下一个“10 秒” ", "次要窗口" (T10-T20),以确保我们正在实施“至少” 10 秒的政策。从“初始窗口”中,我们删除所有事件(因为我们已经考虑了它们),然后从“辅助窗口”中,我们删除“初始窗口”中发生的事件。我们继续移动 10 秒滑动窗口,所以现在我们正在查看窗口 T20-T30,重复并冲洗。

我如何在 Rx 中实现它,因为它似乎是要走的路。

【问题讨论】:

  • 您说的是滑动窗口,但您的示例似乎使用了跳跃窗口 - 是什么?

标签: c# .net system.reactive


【解决方案1】:

如果您可以依赖服务器时钟和消息中的时间戳(也就是说,我们处于“现实生活”模式),并且您会在 10 秒的滑动延迟之后与跳跃的 10 秒窗口相反,那么您可以将事件延迟 10 秒:

var events = new Subject<Event>();  
var delayedEvents = events.Delay(TimeSpan.FromSeconds(10));

检查独特事件等只是将它们添加到某种集合中的问题:

var guidSet = new HashSet<Guid>();  
delayedEvents.Do(e => guidSet.Add(e.identifier));

如果您的问题是您必须等待 10 秒,然后一次处理最后 10 秒,那么您只想缓冲 10 秒:

var bufferedEvents = events.Buffer(TimeSpan.FromSeconds(10));
bufferedEvents.Do(es => { foreach (var e in es) guidSet.Add(e.identifier); });

我没有展示滑动 10 秒窗口的示例,因为我无法想象这就是您想要的(事件被多次处理)。


现在我们开始认真了。假设您不想依赖墙上时间,而是想使用事件中的时间来驱动您​​的逻辑。假设事件被重新定义为:

 public class Event
 {
      public Guid identifier;
      public DateTime ts;
 }

创建历史调度器并从原始事件中提供已调度的事件:

var scheduler = new HistoricalScheduler();
var driveSchedule = events.Subscribe(e => scheduler.AdvanceTo(e.ts));   
var target = events.SelectMany(e => Observable.Timer(e.ts, scheduler).Select(_ => e));

现在您可以简单地在 target 上使用常规的 Rx 组合器而不是 event,并通过调度程序以便它们被适当地触发,例如:

var bufferedEvents = target.Buffer(TimeSpan.FromSeconds(10), scheduler);

这是一个简单的测试。创建一百个事件,每个事件“几乎”间隔 30 秒,但每秒实时触发:

var now = DateTime.Now;
var test = Enumerable.Range(0,99).Select(i =>
    Scheduler.ThreadPool.Schedule(
        TimeSpan.FromSeconds(i), 
        () => events.OnNext(new Event() { 
            identifier = Guid.NewGuid(), 
            ts = now.AddSeconds(i * 30) 
        })
    )
).ToList();

订阅它并请求 60 秒的缓冲事件 - 实际上每 2 个“真实”秒(60 个虚拟秒)接收 2 个事件:

target.Select(e => String.Format("{0} {1}", e.identifier, e.ts.ToString()))
      .Buffer(TimeSpan.FromSeconds(60), scheduler)
      .Select(es => String.Join(" - ", es))
      .DumpLive();

【讨论】:

  • yamen,如果我没看错,你的 MyScheduler 有错字吗?还是我需要创建这样的调度器?
  • 因此,如果我正确理解了这一点,如果我在虚拟时间只泵出 1 个事件,并在虚拟时间说缓冲 10 秒。我永远不会终止可观察的,对吧?我希望这是真的,否则我不明白。
  • 另外,您在 2 秒内收到 2 个事件(预计 60 秒内流失)的原因是因为 DumpLive 对吗?如果我订阅了它,我会在虚拟时间的第 60 秒立即收到 2 个事件?
  • MyScheduler 是对的,这是一个错字,现在已经消失了。您立即获得两个事件的原因是因为它仅在虚拟时间中。虚拟时间测试实时“快速”运行。如果您在测试中将TimeSpan.FromSeconds(i) 更改为TimeSpan.FromSeconds(i * 30),虚拟时间和实时将匹配,测试将按照真实世界运行。
  • 好的,driveSchedule 是如何使用的?我没有看到它被使用。
猜你喜欢
  • 2014-03-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-07-09
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多