【发布时间】:2017-12-12 13:48:51
【问题描述】:
我刚刚学习 Rx-java 和 Rxandroid2,我只是很困惑 SubscribeOn 和 ObserveOn 之间的主要区别是什么。
【问题讨论】:
标签: java android rx-java rx-android rx-java2
我刚刚学习 Rx-java 和 Rxandroid2,我只是很困惑 SubscribeOn 和 ObserveOn 之间的主要区别是什么。
【问题讨论】:
标签: java android rx-java rx-android rx-java2
SubscribeOn 指定 Observable 将在其上运行的调度程序。 ObserveOn 指定观察者将在其上观察此 Observable 的调度程序。
所以基本上,SubscribeOn 主要在后台线程上订阅(执行)(您不想在等待可观察对象时阻塞 UI 线程),并且在 ObserveOn 中您想在主线程上观察结果...
如果你熟悉 AsyncTask 那么 SubscribeOn 类似于 doInBackground 方法和 ObserveOn 类似于 onPostExecute...
【讨论】:
如果您发现 above answer 充满了行话:
tl;dr
Observable.just("Some string")
.map(str -> str.length())
.observeOn(Schedulers.computation())
.map(length -> 2 * length)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(---)
观察一个 observable... 在 IO 线程中执行 map 函数(因为我们是
"subscribingOn"那个线程)... 现在切换到 Computation Thread并执行map(length -> 2 * length)函数...最后确保在 (observeOn()) Main 线程上观察输出。
无论如何,
observeOn() 只是将所有运算符的线程进一步更改为Downstream。人们通常有这种误解,认为observeOn 也充当上游,但事实并非如此。
下面的例子会更好地解释它......
Observable.just("Some string") // UI
.map(str -> str.length()) // UI
.observeOn(Schedulers.computation()) // Changing the thread
.map(length -> 2 * length) // Computation
.subscribe(---)
subscribeOn() 仅影响 Observable 将要被订阅时将使用的线程,并且它将留在下游。
Observable.just("Some String") // Computation
.map(str -> str.length()) // Computation
.map(length -> 2 * length) // Computation
.subscribeOn(Schedulers.computation()) // -- changing the thread
.subscribe(number -> Log.d("", "Number " + number));// Computation
位置无所谓 (
subscribeOn())
为什么? 因为它影响只订阅时间。
服从与
subscribeOn联系的方法
-> 基本示例:Observable.create
create 主体中指定的所有工作都将在subscribeOn 中指定的线程上运行。
另一个例子:Observable.just,Observable.from 或 Observable.range
注意:所有这些方法都接受值,所以不要使用阻塞方法来创建这些值,因为 subscribeOn 不会影响它。
如果要使用阻塞函数,请使用
Observable.defer(() -> Obervable.just(blockingMenthod())));
重要事实:
subscribeOn 不适用于Subjects
多个
subscribeOn:
如果流中有多个subscribeOn的实例,只有第一个有实际效果。
订阅和
subscribeOn
人们认为subscribeOn 与Observable.subscribe 有关,但并没有什么特别的关系。
它只影响订阅阶段。
来源:Tomek Polański (Medium)
【讨论】:
subscribeOn 中也很重要。那么哪一个是真的呢?
subscribeOn 呼叫,则 subscribeOn 的位置很重要。 当涉及到单个呼叫时,位置无关。
AndroidSchedulers.main() 以外的线程并将结果带到主线程以更新 UI。
Observable.defer(),它与阻塞主线程无关,这部分取决于您使用的Scheduler。 .defer() 唯一做的就是在观察者订阅并延迟为每个观察者创建一个新的 observable 之前它不会创建 Observable。
observeOn 为回调设置线程“在流的下游(在其下方)”,例如doOnNext 或map 中的代码块。subscribeOn 为初始化“上游(上方)”设置线程,例如doOnSubscribe、Observable.just 或Observable.create。让我们通过一个例子来了解这个主题:我们想要找到字符串“user1032613”的长度。这对计算机来说不是一件容易的事,因此我们很自然地会在后台线程中执行密集计算,以避免冻结应用程序。
我们可以多次调用observeOn,它控制所有它下面的回调将运行哪个线程。它易于使用,并且可以按照您的预期工作。
例如,我们会在主 UI 线程上显示一个进度条,然后在另一个线程中进行密集/阻塞操作,然后返回主 UI 线程更新结果:
Observable.just("user1032613")
.observeOn(mainThread) // set thread for operation 1
.doOnNext {
/* operation 1 */
print("display progress bar")
progressBar.visibility = View.VISIBLE
}
.observeOn(backThread) // set thread for operation 2 and 3
.map {
/* operation 2 */
print("calculating")
Thread.sleep(5000)
it.length
}
.doOnNext {
/* operation 3 */
print("finished calculating")
}
.observeOn(mainThread) // set thread for operation 4
.doOnNext {
/* operation 4 */
print("hide progress bar and display result")
progressBar.visibility = View.GONE
resultTextView.text = "There're $it characters!"
}
.subscribe()
在上面的例子中,/* operation 1 */ 在mainThread 中运行,因为我们在其正上方使用observeOn(mainThread) 设置它;然后我们通过再次调用observeOn 切换到backThread,所以/* operation 2 */ 将在那里运行。因为我们在链接/* operation 3 */之前没有改变它,所以它也会在后面的线程中运行,就像/* operation 2 */一样;最后我们再次调用observeOn(mainThread),以确保/* operation 4 */从主线程更新UI。
所以我们学习了observeOn 为后续回调设置线程。我们还缺少什么?那么Observable本身,以及它的just()、create()、subscribe()等方法也是需要执行的代码。这就是对象沿流传递的方式。我们使用subscribeOn为与Observable本身相关的代码设置线程。
如果我们删除所有回调(由前面讨论过的observeOn 控制),我们会留下“骨架代码”,默认情况下,它将在编写代码的任何线程(可能是主线程)上运行:
Observable.just("user1032613")
.observeOn(mainThread)
.doOnNext {
}
.observeOn(backThread)
.map {
}
.doOnNext {
}
.observeOn(mainThread)
.doOnNext {
}
.subscribe()
如果我们对在主线程上运行的这个空的骨架代码不满意,我们可以使用subscribeOn 来改变它。例如,第一行Observable.just("user1032613") 可能不像从我的用户名创建一个流那么简单——它可能是来自 Internet 的字符串,或者您可能正在使用 doOnSubscribe 进行其他一些密集操作。在这种情况下,您可以调用subscribeOn(backThread) 将部分代码放到另一个线程中。
subscribeOn
在写这个答案的时候,有一些误解,说“只调用一次”,“位置无关紧要”,“如果你多次调用它,只计算第一次”。经过大量研究和实验,事实证明subscribeOn 可以被多次调用。
因为Observable 使用Builder Pattern(“一个接一个地链接方法”的花哨名称),所以subscribeOn 以相反的顺序应用。因此,该方法为上面的代码设置线程,与observeOn正好相反。
我们可以使用doOnSubscribe 方法进行实验。该方法在订阅事件上触发,在subscribeOn设置的线程上运行:
Observable.just("user1032613")
.doOnSubscribe {
print("#3 running on main thread")
}
.subscribeOn(mainThread) // set thread for #3 and just()
.doOnNext {
}
.map {
}
.doOnSubscribe {
print("#2 running on back thread")
}
.doOnNext {
}
.subscribeOn(backThread) // set thread for #2 above
.doOnNext {
}
.doOnSubscribe {
print("#1 running on default thread")
}
.subscribe()
如果您从下到上阅读上面的示例,可能会更容易遵循逻辑,就像Builder Pattern如何执行代码一样。
在此示例中,第一行 Observable.just("user1032613") 与 print("#3") 在同一线程中运行,因为它们之间没有更多的 subscribeOn。对于只关心just() 或create() 内部代码的人来说,这会产生“只有第一次调用很重要”的错觉。这个quickly falls apart once you start doing more。
为简洁起见,示例中的线程和print() 函数定义如下:
val mainThread = AndroidSchedulers.mainThread()
val backThread = Schedulers.computation()
private fun print(msg: String) = Log.i("", "${Thread.currentThread().name}: $msg")
【讨论】:
subscribeOn() 只工作一次”的人。看了你的解释,好像很有道理。我可以用你的一些解释来更新我的答案吗?
observeOn 控制下面它的代码,subscribeOn 控制它上面它的代码,如果它有observeOn,哪个线程将跟随代码块在它上面和subscribeOn 在它下面?抱歉,有点糊涂
subscribeOn 为框架“骨架”本身设置线程,observeOn 为回调函数/代码块设置线程。他们控制的不是同一件事。在您的情况下,“代码块”在 observeOn 设置的任何线程上运行。
subscribe() 中添加代码并解释它在哪个线程上运行以及subscribeOn 和observeOn 对其有何影响?
如果有人觉得 rx java 描述难以理解(比如我),这里是纯 java 的解释:
Observable.just("something")
.subscribeOn(Schedulers.newThread())
.subscribe(...);
相当于:
Observable observable = Observable.just("something");
new Thread(() -> observable.subscribe(...)).start();
因为Observable 在subscribe() 上发出值,而这里subscribe() 进入单独的线程,所以这些值也在与subscribe() 相同的线程中发出。这就是为什么它在“上游”(影响先前操作的线程)和“下游”工作的原因。
Observable.just("something")
.observeOn(Schedulers.newThread())
.subscribe(...);
相当于:
Observable observable = Observable.just("something")
.subscribe(it -> new Thread(() -> ...).start());
这里Observable在主线程中发出值,只有监听器方法在单独的线程中执行。
【讨论】:
这个答案并不新鲜,我只是想再澄清一点。
假设我们有两个线程。
val pool1 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 1") }
val pool2 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 2") }
如答案所述,observeOn 将设置Downstream,subscribeOn 将设置Upstream。但是如果两者都被使用呢?为了检查这一点,我逐行添加了日志。
Observable.just("what if use both")
.doOnSubscribe { Log.d("Thread", "both, doOnSubscribe A " + Thread.currentThread().name) }
.doOnNext { Log.d("Thread", "both, doOnNext A " + Thread.currentThread().name) }
.map {
Log.d("Thread", "both, map A " + Thread.currentThread().name)
it + " A"
}
// observeOn
.observeOn(Schedulers.from(pool1))
.doOnSubscribe { Log.d("Thread", "both, doOnSubscribe B " + Thread.currentThread().name) }
.doOnNext { Log.d("Thread", "both, doOnNext B " + Thread.currentThread().name) }
.map {
Log.d("Thread", "both, map B " + Thread.currentThread().name)
it + " B"
}
// subscribeOn
.subscribeOn(Schedulers.from(pool2))
.doOnSubscribe { Log.d("Thread", "both, doOnSubscribe C " + Thread.currentThread().name) }
.doOnNext { Log.d("Thread", "both, doOnNext C " + Thread.currentThread().name) }
.map {
Log.d("Thread", "both, map C " + Thread.currentThread().name)
it + " C"
}
// observeOn main
.observeOn(AndroidSchedulers.mainThread())
.doOnNext { Log.d("Thread", "main " + Thread.currentThread().name) }
.subscribe(
{ result -> Log.d("Thread", "main subscribe " + Thread.currentThread().name)}
, { error -> {} }
)
结果是:
both, doOnSubscribe C main
both, doOnSubscribe A Thread 2
both, doOnSubscribe B Thread 2
both, doOnNext A Thread 2
both, map A Thread 2
both, doOnNext B Thread 1
both, map B Thread 1
both, doOnNext C Thread 1
both, map C Thread 1
main main
main subscribe main
result: what if use both A B C
如您所见,doOnSubscribe 首先调用,从下到上。这意味着subscribe 的优先级高于其他运算符,因此处理第一个代码的第一个线程是Thread 2。
然后逐行调用其他运算符。在observeOn 之后,线程更改为Thread 1。然后,就在调用subscribe 之前,再次调用observeOn,将线程更改为主线程。 (别管AndroidScheduler,它只是一种调度器)
TL;DR;
subscribeOn 首先调用,从下到上。observeOn 调用,从上到下,连同其他代码。【讨论】:
当您订阅 observable 时,会启动一个流程,该流程会一直向上运行到链的顶部,然后再次返回。订阅部分与上链相关,观察部分与下链相关。
一旦到达链的顶端,订阅阶段就基本上完成了。开始发出事件并调用映射、过滤器等的下行链。
SubscribeOn 会影响其位置上方的 subscription 调用,例如 doOnSubscribe。
ObserveOn 影响其位置下方的观察调用,例如 doOnNext、map、flatmap 等。
两者都会改变用于继续向上或向下流动的线程。
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.CountDownLatch;
public class SubscribeVsObserveOn {
public static void main(String[] args) throws InterruptedException {
System.out.println("Ordinal 0: " + Thread.currentThread().getName());
final CountDownLatch latch = new CountDownLatch(1);
Observable
.just("a regular string.")
.doOnSubscribe(disposable ->
System.out.println("Ordinal 2: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.doOnNext(s ->
System.out.println("Ordinal 3: " + Thread.currentThread().getName()))
.map(s -> s)
.doOnSubscribe(disposable ->
System.out.println("Ordinal 1: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.doOnNext(s ->
System.out.println("Ordinal 4: " + Thread.currentThread().getName()))
.map(s -> s)
.subscribe(s -> latch.countDown());
latch.await();
}
}
这是输出:
Ordinal 0: main
Ordinal 1: RxNewThreadScheduler-1
Ordinal 2: RxNewThreadScheduler-2
Ordinal 3: RxNewThreadScheduler-3
Ordinal 4: RxNewThreadScheduler-4
【讨论】: