【问题标题】:RxJava async subscribe will lost messageRxJava 异步订阅将丢失消息
【发布时间】:2017-06-24 05:21:57
【问题描述】:

我正在学习 RxJava 异步订阅是如何工作的。但是有些问题让我很困惑。

@Test public void testCreateAsync() throws InterruptedException {
  Observable<String> observable = Observable.create(emitter -> {
    for (int i = 0; i < 10; i++) {
      if (!emitter.isDisposed()) {
        int finalI = i;
        new Thread(() -> emitter.onNext("value_" + finalI)).start();
      }
    }
    if (!emitter.isDisposed()) {
      emitter.onComplete();
    }
  });

  observable.subscribe(System.out::println);
}

上面的代码运行良好,并将value_1 打印到value_9。但是当我在订阅前添加睡眠时,不会打印最后一条消息value_9,如下所示:

@Test public void testCreateAsync() throws InterruptedException {
  ...
  Thread.sleep(3000);
  observable.subscribe(System.out::println);
}

感谢任何关于这个问题的讨论。

ps:java版本为1.8,RxJava版本为2.1.1

【问题讨论】:

    标签: rx-java2


    【解决方案1】:

    有两个问题:

    1. 同时从不同线程调用onNext
    2. onComplete 可能会在之前的线程能够发出它们的值之前执行,这会阻止任何进一步的onNext 发射。

    sleep 没有特别的效果,重复执行上面的代码有时会产生这种行为。

    (我不清楚您想通过此设置实现什么目标)。

    【讨论】:

    • 其实没什么意义,只是一个demo而已。感谢您的回答。我忽略了并发问题。
    猜你喜欢
    • 2019-12-11
    • 1970-01-01
    • 1970-01-01
    • 2016-12-11
    • 1970-01-01
    • 2016-07-10
    • 1970-01-01
    • 2019-03-19
    • 1970-01-01
    相关资源
    最近更新 更多