【问题标题】:RX - Group/Batch bursts of elements in an observable sequenceRX - 可观察序列中元素的组/批量突发
【发布时间】:2015-07-03 17:39:35
【问题描述】:

我有一个可观察的序列。当插入第一个元素时,我想启动一个计时器并在计时器的时间跨度内批处理后续插入的元素。然后,在序列中插入另一个元素之前,计时器不会再次启动。

所以是这样的:

--------|=====timespan====|---------------|=====timespan====|-------------->
        1  2 3 4    5                     6 7            8

会产生:

[1,2,3,4,5], [6,7,8] 

我尝试使用 Observable.Buffer() 和时间跨度,但从我的实验中,我可以看到,一旦我们订阅了 observable 序列,计时器就会启动,并且在前一个计时器完成后立即重新启动。

因此,与上一个示例具有相同的序列并使用具有时间跨度的 Buffer(),我会得到这样的结果:

|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
        1  2 3 4    5                      6 7           8

会产生这个:

[1,2,3,4], [5], [6,7], [8]

这是我使用 Buffer 测试此行为的方法:

var source = Observable.Concat(Observable.Timer(TimeSpan.FromSeconds(6)).Select(o => 1),
                               Observable.Timer(TimeSpan.FromSeconds(1)).Select(o => 2),
                               Observable.Timer(TimeSpan.FromSeconds(3)).Select(o => 3),
                               Observable.Never<int>());

Console.WriteLine("{0} => Started", DateTime.Now);
source.Buffer(TimeSpan.FromSeconds(4))
      .Subscribe(i => Console.WriteLine("{0} => [{1}]", DateTime.Now, string.Join(",", i)));

输出:

4/24/2015 7:01:09 PM => Started
4/24/2015 7:01:13 PM => []
4/24/2015 7:01:17 PM => [1,2]
4/24/2015 7:01:21 PM => [3]
4/24/2015 7:01:25 PM => []
4/24/2015 7:01:29 PM => []
4/24/2015 7:01:33 PM => []

有人知道如何做到这一点吗?提前致谢!

【问题讨论】:

  • 这看起来像 c#。为什么你有 [rx-java] 标签?
  • 我猜Java或C#中的逻辑是相同的。如果没有,请告诉我,我将删除标签。谢谢!
  • 现在有更多的 Rx 框架适用于无数种语言。我不知道它们有多相似。
  • 我删除了标签 [rx-java] 以避免任何混淆

标签: c# system.reactive observable


【解决方案1】:

试一试:

var source = Observable.Concat(Observable.Timer(TimeSpan.FromSeconds(6)).Select(o => 1),
                           Observable.Timer(TimeSpan.FromSeconds(1)).Select(o => 2),
                           Observable.Timer(TimeSpan.FromSeconds(4)).Select(o => 3),
                           Observable.Never<int>());

Console.WriteLine("{0} => Started", DateTime.Now);
source
    .GroupByUntil(x => 1, g => Observable.Timer(TimeSpan.FromSeconds(4)))
    .Select(x => x.ToArray())
    .Switch()
    .Subscribe(i => Console.WriteLine("{0} => [{1}]", DateTime.Now, string.Join(",", i)));

我不得不更改您的第三个计时器的测试代码持续时间,以确保该值在分组计时器之外。

【讨论】:

  • 我尝试使用多个值进行验证,结果完全符合预期!谢谢!!
  • 是否有可能因此丢失数据?不同线程(Observable.Timer)生成的源序列,会不会有竞速条件?
  • @Absolom - 我认为它不会丢失任何值,因为每个操作员都在单个序列中工作。
猜你喜欢
  • 1970-01-01
  • 2018-11-26
  • 1970-01-01
  • 1970-01-01
  • 2022-01-23
  • 2020-03-23
  • 1970-01-01
  • 1970-01-01
  • 2014-01-28
相关资源
最近更新 更多