【问题标题】:How to run subsciber on multiple threads with RxJava on Android如何在 Android 上使用 RxJava 在多个线程上运行订阅者
【发布时间】:2015-10-21 19:57:05
【问题描述】:

我是 RxJava 的新手,正在努力解决一个(我猜的)简单问题。我想在 3 个线程中同时处理订阅部分。这就是我使用 FixedThreadPool 的原因。示例代码:

Observer.just("one", "two", "three", "four")
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))
.subscribe(new Observer<String>() {

    public void onNext(String string) {
        Log.d(TAG, "Started: " + string);
        SystemClock.sleep(1000);
        Log.d(TAG, "Ended: " + string);
    }

    (...)

}

预期结果:

Started: one
Started: two
Started: three
Ended: one
Started: four
Ended: two
Ended: three
Ended: four

实际结果:

Started: one
Ended: one
Started: two
Ended: two
Started: three
Ended: three
Started: four
Ended: four

我做错了什么?

【问题讨论】:

    标签: android multithreading rx-java rx-android


    【解决方案1】:

    RxJava Observables 是顺序的,subscribeOnobserveOn 运算符不会彼此并行运行值。

    您可以实现的最接近的事情是通过模键对值进行分组,通过observeOn 运行它们并合并结果:

    AtomicInteger count = new AtomicInteger();
    
    Observable.range(1, 100)
    .groupBy(v -> count.getAndIncrement() % 3)
    .flatMap(g -> g
        .observeOn(Schedulers.computation())
        .map(v ->  Thread.currentThread() + ": " + v))
    .toBlocking()
    .forEach(System.out::println);
    

    【讨论】:

    • 感谢您的回答。但是,我是否正确理解该解决方案为每个线程“设置单独的队列”,所以如果任务不花费相同的时间,那么最后一些线程可能会更早完成,而一个线程仍有几个任务要运行。我的问题是 RxJava 是否支持在多个线程之间使用共享队列?
    • 这个设置没有工作窃取,也没有任何共享队列。
    猜你喜欢
    • 1970-01-01
    • 2020-03-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-11
    • 2015-07-25
    相关资源
    最近更新 更多