【发布时间】:2018-06-28 14:31:26
【问题描述】:
我有一个方法调用我认为在 IO 线程上运行的 web 服务,直到服务停止并且 UI 冻结。
所以我开始了一些简单的测试来检查线程
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.8'
public void test() {
disposableRx.add(
Observable.just(1, 2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("Emitting item on: " + Thread.currentThread().getName());
}
})
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
System.out.println("Processing item on: " + Thread.currentThread().getName());
return integer * 2;
}
})
.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("Consuming item on: " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
})
);
}
是否导致以下输出表明一切都在主线程上运行,尽管有订阅和观察?
Emitting item on: main
Processing item on: main
Consuming item on: main
Emitting item on: main
Processing item on: main
Consuming item on: main
但是 如果我将 observeOn 移到 .subscribeWith 之前,如下所示...
public void test() {
disposableRx.add(
Observable.just(1, 2)
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("Emitting item on: " + Thread.currentThread().getName());
}
})
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
System.out.println("Processing item on: " + Thread.currentThread().getName());
return integer * 2;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("Consuming item on: " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
})
);
}
输出是我正在寻找的东西,我必须说即使在阅读了许多关于 RxJava 的博客之后也让我感到困惑。
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Consuming item on: main
Consuming item on: main
我已经剥离了我原来的方法,直到它几乎是博客文章中示例方法的副本
这意味着这应该在 IO 线程上运行 loadPerson(),在主线程上发射。它没有。
disposableRx.add(
repo.loadPersonProfile(id).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onSuccess(@NonNull String response) {
loadPersonDetailsResponse.setValue(ViewModelResponse.success(response));
isLoading.setValue(false);
}
@Override
public void onError(@NonNull Throwable e) {
loadPersonDetailsResponse.setValue(ViewModelResponse.error(e));
isLoading.setValue(false);
}
@Override
public void onComplete() {
}
})
);
从我的方法中转出线程表明它正在主线程上运行?
这是什么原因造成的?
【问题讨论】:
-
所以obervales根据您的测试在主线程上观察和订阅,还是只是观察?
-
是的,在最后一个调用 loadPersonDetailsResponse 的代码示例中订阅和观察主线程。
-
observeOn(scheduler) 的意思是“在继续之前切换到这个调度器”
标签: android multithreading observable rx-java2