【问题标题】:How to preserve the order of items emitted by two observables after they are merged?合并后如何保留两个可观察对象发出的项目的顺序?
【发布时间】:2021-06-21 18:06:22
【问题描述】:

我遇到了令我惊讶的 Scala Observables 行为。考虑下面我的例子:

object ObservablesDemo extends App {

  val oFast = Observable.interval(3.seconds).map(n => s"[FAST] ${n*3}")
  val oSlow = Observable.interval(7.seconds).map(n => s"[SLOW] ${n*7}")
  val oBoth = (oFast merge oSlow).take(8)

  oBoth.subscribe(println(_))
  oBoth.toBlocking.toIterable.last

}

代码演示了从两个可观察对象发射元素。其中一个以“慢”的方式(每 7 秒)发射其元素,另一个以“快速”的方式(每 3 秒)发射。为了这个问题,假设我们想要使用map 函数定义这些可观察对象,并如上所示适当地映射来自interval 的数字(而不是另一种可能的方法,即同时发出项目从两个 observables 中获取速率,然后根据需要 filtering)。

代码的输出对我来说似乎违反直觉:

[FAST] 0
[FAST] 3
[SLOW] 0
[FAST] 6
[FAST] 9   <-- HERE
[SLOW] 7   <-- HERE
[FAST] 12
[FAST] 15

有问题的部分是[FAST] observable 在[SLOW] observable 发出7 之前发出9。我希望79 之前发出,因为在第七秒发出的任何东西都应该在第九秒发出的东西之前。

我应该如何修改代码以实现预期的行为?我查看了 RxScala 文档并开始搜索诸如不同的 interval 函数和 Scheduler 类等主题,但我不确定它是否是搜索答案的正确位置。

【问题讨论】:

  • 我建议使用 fs2MonixAkkaStreamsZIO 之类的东西RxScala。还有一个库在引擎盖下使用 Laminar outwatch 我相信被称为)
  • 问题是你的快速可观察从3rd 秒开始,慢速从7th 秒开始。因此,912th 秒的快速发射,而714th 秒的慢速发射。
  • 您需要使用 Observable.interval(0.seconds, 3.seconds)Observable.interval(0.seconds, 7.seconds)0th 秒处开始您的 observables。

标签: scala observable rx-java reactivex rx-scala


【解决方案1】:

这看起来应该是这样的。这里列出了秒数和事件。您可以使用TestObserverTestScheduler 验证RXScala 中是否可用。 RXScala 在 2019 年停产,因此请记住这一点。

Secs   Event
-----------------
1
2
3      [Fast] 0
4
5
6      [Fast] 3
7      [Slow] 0
8
9      [Fast] 6
10
11
12     [Fast] 9
13
14     [Slow] 7
15     [Fast] 12
16
17
18     [Fast] 15
19
20
21     [Fast] 18

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-01-01
    • 1970-01-01
    • 2020-02-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多