【问题标题】:rxjava consumes next element even after exceptionrxjava 即使在异常之后也会消耗下一个元素
【发布时间】:2014-03-04 14:09:45
【问题描述】:

有人能解释一下为什么 rxjava 在“onNext”中的异常发生之后甚至在“onError”被调用之后消耗可观察序列中的下一个元素吗?

这是我的模拟:

import java.util.concurrent.{TimeUnit, CountDownLatch}
import rx.lang.scala._

object Tests {
  val counter = new CountDownLatch(1)

  def buildStream(num: Int) = {
    Stream.range(1, num)
      .map {s =>
      println(s"[${Thread.currentThread().getId}] Taken: $s");
      s}
  }
  val stream = buildStream(10).toSeq
  stream.toObservable
    .subscribe(
      onNext = x => consume(x),
      onError = e => println(s"ERROR! $e"),
      onCompleted = () => {println("completed"); counter.countDown()}
    )

 def consume(x: Int) = {
    println(s"[${Thread.currentThread().getId}] consuming: $x")
    Thread.sleep(100)
    if (x == 5) {
      throw new Exception(s"[${Thread.currentThread().getId}] consume?! ha!")
    }
  }
  counter.await(10, TimeUnit.SECONDS)
}

其结果如下,可以看到#6项是从异常引发后的序列中取出的:

[39] consuming: 1
[39] Taken: 2
[39] consuming: 2
[39] Taken: 3
[39] consuming: 3
[39] Taken: 4
[39] consuming: 4
[39] Taken: 5
[39] consuming: 5
ERROR! java.lang.Exception: [39] consume?! ha!
[39] Taken: 6
res0: rx.lang.scala.Subscription

据我所知,从常规序列创建的可观察集合是“冷的”,只有在观察者成功处理当前序列后,才能从源序列中获取下一个项目。 我可能会怀疑一些线程问题,但这意味着“冷”不是那种不可能是真的“冷”,而且我可以清楚地看到始终使用相同的线程 ID。

那么为什么从序列中取出第 6 项?!

【问题讨论】:

    标签: scala rx-java


    【解决方案1】:

    Observable 是一个同步的Observable 这意味着Observer 不能阻止Observable 生成新元素。但是,RxJava 承诺ObserveronErroronCompleted 之后不会收到任何消息。这与您的示例一致。

    在即将到来的 0.17.0 中,RxJava 将解决这个问题。您可以在此处查看讨论:https://github.com/Netflix/RxJava/issues/802

    【讨论】:

      猜你喜欢
      • 2019-09-12
      • 2020-07-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-27
      • 2017-09-05
      相关资源
      最近更新 更多