【发布时间】:2021-06-21 18:06:22
【问题描述】:
我遇到了令我惊讶的 Scala Observables 行为。考虑下面我的例子:
object ObservablesDemo extends App {
val oFast = Observable.interval(3.seconds).map(n => s"[FAST] ${n*3}")
val oSlow = Observable.interval(7.seconds).map(n => s"[SLOW] ${n*7}")
val oBoth = (oFast merge oSlow).take(8)
oBoth.subscribe(println(_))
oBoth.toBlocking.toIterable.last
}
代码演示了从两个可观察对象发射元素。其中一个以“慢”的方式(每 7 秒)发射其元素,另一个以“快速”的方式(每 3 秒)发射。为了这个问题,假设我们想要使用map 函数定义这些可观察对象,并如上所示适当地映射来自interval 的数字(而不是另一种可能的方法,即同时发出项目从两个 observables 中获取速率,然后根据需要 filtering)。
代码的输出对我来说似乎违反直觉:
[FAST] 0
[FAST] 3
[SLOW] 0
[FAST] 6
[FAST] 9 <-- HERE
[SLOW] 7 <-- HERE
[FAST] 12
[FAST] 15
有问题的部分是[FAST] observable 在[SLOW] observable 发出7 之前发出9。我希望7 在9 之前发出,因为在第七秒发出的任何东西都应该在第九秒发出的东西之前。
我应该如何修改代码以实现预期的行为?我查看了 RxScala 文档并开始搜索诸如不同的 interval 函数和 Scheduler 类等主题,但我不确定它是否是搜索答案的正确位置。
【问题讨论】:
-
我建议使用 fs2、Monix、AkkaStreams 或 ZIO 之类的东西RxScala。还有一个库在引擎盖下使用 Laminar (outwatch 我相信被称为)。
-
问题是你的快速可观察从
3rd秒开始,慢速从7th秒开始。因此,9以12th秒的快速发射,而7以14th秒的慢速发射。 -
您需要使用
Observable.interval(0.seconds, 3.seconds)和Observable.interval(0.seconds, 7.seconds)在0th秒处开始您的 observables。
标签: scala observable rx-java reactivex rx-scala