【发布时间】:2015-03-05 14:30:41
【问题描述】:
我有一个定期触发的事件。假设处理事件需要大约 1 秒。我不想为每个收到的事件等待 1 秒,而是想累积事件 直到最后一个处理完成。处理完成后,我想处理上次处理期间收到的事件数据:
e1 e2 e3 e4 e5 e6 e7 events happening
---------------------------------------------------------------------------------------------------------------------------------------------------> time
1s 2s 3s 4s 5s 6s
p(e1) p(e2, e3) p(e4) p(e5, e6) p(e7)
[-----------------------][-----------------------] [-----------------------][-----------------------][-----------------------] processing of items
In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 -
is finished the processing of the events e2 and e3 takes place.
This proces is similar to a rolling build: a changeset is checked in, the buildserver starts building and once the build is finished all changesets that have been
checked in during the build will then be processed.
我应该如何使用 Rx 做到这一点?
我曾尝试将缓冲区与打开和关闭选择器结合使用,但我无法正确使用。任何示例或方向表示赞赏!
让我们假设Subject<int> 作为输入流。
我已经尝试过这样的事情,但我完全迷失了。
var observer1 = input
.Buffer(bc.Where(open => open), _ => bc.Where(open => !open))
.Subscribe(ev =>
{
bc.OnNext(true);
String.Format("Processing items {0}.", string.Join(", ", ev.Select(e => e.ToString())).Dump());
Thread.Sleep(300);
bc.OnNext(false);
});
【问题讨论】:
-
我喜欢滚动构建类比。以后解释这种情况时我会自己使用它!
标签: c# .net system.reactive reactive-programming