【问题标题】:RX Java 2 contract on sequential processingRX Java 2 顺序处理合同
【发布时间】:2019-07-23 09:47:36
【问题描述】:

TL; DR - 是否保证默认情况下,在观察 Observable 发出的事件时,在任何给定时间只使用一个线程?

在我看来,RxJava2 通常是顺序的,除非通过诸如 parallel() 之类的东西以其他方式表示。即使使用observeOn/subscribeOn,我也看到有例如doOnNext() 永远不会有两个线程同时运行:

AtomicInteger counter = new AtomicInteger();

PublishSubject<Integer> testSubject = PublishSubject.create();

testSubject
.observeOn(Schedulers.io())
.doOnNext(val -> {
  if(counter.incrementAndGet() > 1)
    System.out.println("Whoa!!!!"); // <- never happens

  Thread.sleep(20);

  counter.decrementAndGet();
})
.subscribe();

for (int i = 0; i < 10000; i++) {
  Thread.sleep(10);
  testSubject.onNext(i);
}

无论我如何更改此示例 - 除非我使用 .toFlowable(...).parallel().runOn(...) 进行硬核,否则我看不到 doOnNext 同时在不同线程上运行。

我想依赖这个特性,这样我就可以忽略我的操作符中的同步问题,但是我从来没有在 RxJava2、RxJava1 甚至一般的 RX 的文档中看到它明确指定。也许我只是错过了,谁能指出合同的这一部分是在哪里描述的?

谢谢!

【问题讨论】:

    标签: java rx-java rx-java2


    【解决方案1】:

    来自http://reactivex.io/documentation/contract.html

    Observables 必须连续地向观察者发出通知(不是在 平行线)。他们可能会从不同的线程发出这些通知, 但必须有一个正式的发生前的关系 通知。

    在您的示例中,您并没有违反这个可观察到的合同。但是如果你实现Observable错误,两个线程会同时运行:

    AtomicInteger counter = new AtomicInteger();
    
        Observable.create(emitter -> {
            new Thread(() -> {
                for (int i = 0; i < 10000; i++) {
                    try {
                        Thread.sleep(1);
                        emitter.onNext(i);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
            }).start();
            for (int i = 0; i < 10000; i++) {
                Thread.sleep(1);
                emitter.onNext(i);
    
            }
        }).doOnNext(integer -> {
            if (counter.incrementAndGet() > 1)
                System.out.println("Whoaa!");
            counter.decrementAndGet();
            Thread.sleep(1);
    
        }).subscribe();
    

    看来你可以用observeOn https://github.com/ReactiveX/RxJava/issues/5550#issuecomment-325114185 来解决这个问题

    【讨论】:

    • 谢谢@zella!这似乎是我需要的。
    【解决方案2】:
    1. 删除Thread.sleep(10);轰炸主题
    2. 在 Worker 中睡眠更长时间以模拟更长时间运行的任务。
    3. 在循环下添加Thread.sleep(10000),以便您的线程等待后台线程完成工作
    4. 如果您想强制每个工作线程在新线程上运行,请更改您的线程池 .subscribeOn(Schedulers.newThread())

    【讨论】:

    • 我认为问题是“是否保证默认情况下只使用一个线程”,这不是答案。
    • 谢谢@eis - 是的,这就是问题所在。我已将其添加到原始帖子中,感谢您使其更加清晰。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-03
    • 1970-01-01
    • 2016-03-05
    • 1970-01-01
    相关资源
    最近更新 更多