【问题标题】:RxJava2 .subscribeOn .observeOn confusion. Running on Main threadRxJava2 .subscribeOn .observeOn 混乱。在主线程上运行
【发布时间】:2018-06-28 14:31:26
【问题描述】:

我有一个方法调用我认为在 IO 线程上运行的 web 服务,直到服务停止并且 UI 冻结。

所以我开始了一些简单的测试来检查线程

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.8'


public void test() {

    disposableRx.add(
            Observable.just(1, 2)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println("Emitting item on: " + Thread.currentThread().getName());
                        }
                    })
                    .map(new Function<Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer) throws Exception {
                            System.out.println("Processing item on: " + Thread.currentThread().getName());
                            return integer * 2;
                        }
                    })

                    .subscribeWith(new DisposableObserver<Integer>() {
                        @Override
                        public void onNext(@NonNull Integer integer) {
                            System.out.println("Consuming item on: " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {
                        }

                        @Override
                        public void onComplete() {
                        }
                    })
    );
}

是否导致以下输出表明一切都在主线程上运行,尽管有订阅和观察?

Emitting item on: main
Processing item on: main
Consuming item on: main
Emitting item on: main
Processing item on: main
Consuming item on: main

但是 如果我将 observeOn 移到 .subscribeWith 之前,如下所示...

public void test() {

    disposableRx.add(
            Observable.just(1, 2)
                    .subscribeOn(Schedulers.io())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println("Emitting item on: " + Thread.currentThread().getName());
                        }
                    })
                    .map(new Function<Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer) throws Exception {
                            System.out.println("Processing item on: " + Thread.currentThread().getName());
                            return integer * 2;
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeWith(new DisposableObserver<Integer>() {
                        @Override
                        public void onNext(@NonNull Integer integer) {
                            System.out.println("Consuming item on: " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {
                        }

                        @Override
                        public void onComplete() {
                        }
                    })
    );
}

输出是我正在寻找的东西,我必须说即使在阅读了许多关于 RxJava 的博客之后也让我感到困惑。

Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Consuming item on: main
Consuming item on: main

我已经剥离了我原来的方法,直到它几乎是博客文章中示例方法的副本

Multithreading like a boss

这意味着这应该在 IO 线程上运行 loadPerson(),在主线程上发射。它没有。

disposableRx.add(
        repo.loadPersonProfile(id).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableMaybeObserver<String>() {

                    @Override
                    public void onSuccess(@NonNull String response) {
                        loadPersonDetailsResponse.setValue(ViewModelResponse.success(response));
                        isLoading.setValue(false);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        loadPersonDetailsResponse.setValue(ViewModelResponse.error(e));
                        isLoading.setValue(false);
                    }

                    @Override
                    public void onComplete() {

                    }
                })
);

从我的方法中转出线程表明它正在主线程上运行?

这是什么原因造成的?

【问题讨论】:

  • 所以obervales根据您的测试在主线程上观察和订阅,还是只是观察?
  • 是的,在最后一个调用 loadPersonDetailsResponse 的代码示例中订阅和观察主线程。
  • observeOn(scheduler) 的意思是“在继续之前切换到这个调度器”

标签: android multithreading observable rx-java2


【解决方案1】:

observeOn()sunbscribeOn() 以及其他运算符的顺序非常重要

  • subscribeOn() 运算符告诉源 Observable在哪个线程上发射和转换项目。

  • 小心你放置 observeOn() 操作符的地方,因为它会改变 执行工作的线程!在大多数情况下,您可能想要延迟 切换到观察线程,直到 Rx 链的最后。

    Observable.just("long", "longer", "longest")
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .map(String::length)
        .filter(length -> length == 6)
        .subscribe(length -> System.out.println("item length " + length));
    

这里的地图、过滤器和消费是在主线程

中执行的
  • observeOn()map() 之前 没有理由在 map() 运算符之上应用 observeOn() 运算符。 其实这段代码会导致NetworkOnMainThreadException!我们不想在主线程上读取 HTTP 响应 — 应该在我们切换回主线程之前完成。

  • 你也可以像这个例子一样使用多个 observeOn() 来切换线程。

     Observable.just("long", "longer", "longest")
    .doOnNext(s -> System.out.println("first doOnNext: processing item on thread " + Thread.currentThread().getName()))
    .observeOn(Schedulers.computation())
    .map(String::toString)
    .doOnNext(s -> System.out.println("second doOnNext: processing item on thread " + Thread.currentThread().getName()))
    .observeOn(Schedulers.io())
    .map(String::toString)
    .subscribeOn(Schedulers.newThread())
    .map(String::length)
    .subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));
    

输出:

first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1

注意 根据这个answer subscribeOn() 不适用于下游运营商,因此它不能保证您的操作将在不同的线程上。

subscribeOn 效果顺流而下,更靠近源头 事件。

关于你的问题,我做了一个测试,结果如下

   private void testRxJava2Async() {
 io.reactivex.Observable.fromCallable(new Callable<String>() {
        @Override
        public String call() throws Exception {

            Log.d(TAG,"callable (expensive assync method) was called on --> "+Thread.currentThread().getName());

            return null;
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new Observer<String>() {


                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {

                    Log.d(TAG,"onNext() was called on --> "+Thread.currentThread().getName());

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}

测试结果:

callable (expensive assync method) was called on --> RxCachedThreadScheduler-1
onNext() was called on--> main

【讨论】:

  • 谢谢。我已经在您的示例中使用简单的 .just 发射器和地图、平面地图等进行了订阅/观察,并且可以看到 observeOn 的位置如何影响线程。我不能在我的最后一个示例块中获得 repo.loadPersonProfile(id) 以在 IO 线程上运行并在主线程上发出。我到底错过了什么?
  • 你用哪个库到loadPersonProfile()؟
  • 此时 loadperson 被剥离,只包含线程名称的 system.out.println 和字符串的 may.just,仅此而已
  • 有趣,我认为 subscribeOn 计划到当前线程,可调用(昂贵的异步方法)被调用 --> RxCachedThreadScheduler-1。不应该在 Main 上调用它吗?
  • 还有你到底做了什么不同的事情?
【解决方案2】:

相信我,我理解你的困惑,让我一步一步解释。记住这一点,每个订阅都应该遵守。我们都知道所有订阅都应该在工作线程(IO)上执行,而观察者在主线程(UI)上执行。在第一个示例中,订阅触发,然后您说,对于此订阅,所有更新都将在主线程上传递,无论在任何转换期间是否有更多订阅。但是在第二个示例中,这就是您所说的,仅当订阅上的地图转换被转换然后在主线程上运行时才观察。把它想象成编程中的 Pre 和 Post 增量。

希望这是有道理的。 RX 框架很棒。但实际上需要现场练习才能做到正确。你亲眼见过吗?

为了确保您的更改在主线程上被推送和执行,您需要做的是添加一个中间步骤来观察更改。

Integer[] list = {6,3,2,1};
 Observable.just(list).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(value = > value  * 2)
.observeOn(Schedulers.mainThread())
.subscribeWith(...)

这个想法是 subscribeOn 只接受开始处理,例如 NetworkRequest 但不保证将值推送到同一个线程。 observeOn 说,嘿,我可以收到您最初订阅的那些值。所以为了确保值在主线程上被转换,你可以观察另一个普通线程上的变化,执行一个操作(地图|平面图等),然后观察这些变化,这将保证只有那些值放在主线程上。基本上你的意思是这样,嘿,在这些线程中执行所有繁重的处理,但是,每当计算完成时,在主线程中将这些值传递给我以继续我的工作,只有这样我才能接受,因此没有工作会永远在主线程上完成。

【讨论】:

  • 谢谢。所以在我的最后一个例子中。如何让 loadPersonProfile 方法在 IO 上运行并在 Main 上发出。你会认为一篇关于 RX 多线程的博文会给出一个正确的例子!
  • 嘿,我知道你已经接受了,但是对于新来的人,你可以使用上面的逻辑来指导你。
  • 感谢您提供更多信息和示例。这绝对有帮助。在剥离我的代码进行测试时,我愚蠢地忘记了将对 loadPerson 的调用包装在可调用对象中。如果您在没有 fromCallable 的情况下调用标准同步方法,它总是在 Main 上运行,无论 subscribeOn 是什么!
猜你喜欢
  • 2018-08-19
  • 1970-01-01
  • 1970-01-01
  • 2016-06-03
  • 1970-01-01
  • 2019-10-11
  • 1970-01-01
  • 2016-10-24
  • 2017-12-12
相关资源
最近更新 更多