【问题标题】:Rxandroid What's the difference between SubscribeOn and ObserveOnRxandroid SubscribeOn 和 ObserveOn 有什么区别
【发布时间】:2017-12-12 13:48:51
【问题描述】:

我刚刚学习 Rx-java 和 Rxandroid2,我只是很困惑 SubscribeOn 和 ObserveOn 之间的主要区别是什么。

【问题讨论】:

    标签: java android rx-java rx-android rx-java2


    【解决方案1】:

    SubscribeOn 指定 Observable 将在其上运行的调度程序。 ObserveOn 指定观察者将在其上观察此 Observable 的调度程序。

    所以基本上,SubscribeOn 主要在后台线程上订阅(执行)(您不想在等待可观察对象时阻塞 UI 线程),并且在 ObserveOn 中您想在主线程上观察结果...

    如果你熟悉 AsyncTask 那么 SubscribeOn 类似于 doInBackground 方法和 ObserveOn 类似于 onPostExecute...

    【讨论】:

    • 第三次阅读此解释后,我现在清楚有什么区别。哈哈 ! +1
    • "一个观察者将观察这个 Observable。"听起来像绕口令。
    【解决方案2】:

    如果您发现 above answer 充满了行话:

    tl;dr

     Observable.just("Some string")                 
               .map(str -> str.length())              
               .observeOn(Schedulers.computation())   
               .map(length -> 2 * length)   
               .observeOn(AndroidSchedulers.mainThread())
               .subscribeOn(Schedulers.io())
               .subscribe(---)
    

    观察一个 observable... 在 IO 线程中执行 map 函数(因为我们是 "subscribingOn" 那个线程)... 现在切换到 Computation Thread并执行map(length -> 2 * length) 函数...最后确保在 (observeOn()) Main 线程上观察输出。

    无论如何

    observeOn() 只是将所有运算符的线程进一步更改为Downstream。人们通常有这种误解,认为observeOn 也充当上游,但事实并非如此。

    下面的例子会更好地解释它......

    Observable.just("Some string")                  // UI
           .map(str -> str.length())               // UI
           .observeOn(Schedulers.computation())   // Changing the thread
           .map(length -> 2 * length)            // Computation
           .subscribe(---)
    

    subscribeOn()影响 Observable 将要被订阅时将使用的线程,并且它将留在下游。

    Observable.just("Some String")              // Computation
      .map(str -> str.length())                // Computation
      .map(length -> 2 * length)              // Computation
      .subscribeOn(Schedulers.computation()) // -- changing the thread
      .subscribe(number -> Log.d("", "Number " + number));// Computation
    

    位置无所谓 (subscribeOn())

    为什么? 因为它影响只订阅时间。

    服从与subscribeOn联系的方法

    -> 基本示例:Observable.create

    create 主体中指定的所有工作都将在subscribeOn 中指定的线程上运行。

    另一个例子:Observable.just,Observable.fromObservable.range

    注意:所有这些方法都接受值,所以不要使用阻塞方法来创建这些值,因为 subscribeOn 不会影响它。

    如果要使用阻塞函数,请使用

    Observable.defer(() -> Obervable.just(blockingMenthod())));

    重要事实:

    subscribeOn 不适用于Subjects

    多个subscribeOn

    如果流中有多个subscribeOn的实例,只有第一个有实际效果。

    订阅和subscribeOn

    人们认为subscribeOnObservable.subscribe 有关,但并没有什么特别的关系。 它只影响订阅阶段

    来源:Tomek Polański (Medium)

    【讨论】:

    • 我很困惑,我以为我明白了,但为什么下面的答案说这个职位在subscribeOn 中也很重要。那么哪一个是真的呢?
    • @Mon 两个答案都有些正确,如果您使用多个 subscribeOn 呼叫,则 subscribeOn 的位置很重要。 当涉及到单个呼叫时,位置无关
    • 如果我在 subscribe() 中做一些 UI 工作,这会阻止我的 UI 吗?我使用了 subscribeOn(Schedulers.io()) 但我不确定。 Observable.defer() 还会阻塞主线程吗?
    • @Hilal 为了执行 UI 任务,您必须快速进入主线程,如果您要在主线程上执行认真的计算,那么它将冻结 UI,这就是我们通常在任何情况下执行任务的原因AndroidSchedulers.main() 以外的线程并将结果带到主线程以更新 UI。
    • @Hilal 谈到Observable.defer(),它与阻塞主线程无关,这部分取决于您使用的Scheduler.defer() 唯一做的就是在观察者订阅并延迟为每个观察者创建一个新的 observable 之前它不会创建 Observable。
    【解决方案3】:

    总结

    • 使用observeOn回调设置线程“在流的下游(在其下方)”,例如doOnNextmap 中的代码块。
    • 使用subscribeOn初始化“上游(上方)”设置线程,例如doOnSubscribeObservable.justObservable.create
    • 这两种方法都可以被多次调用,每次调用都会覆盖之前的调用。 位置很重要。

    让我们通过一个例子来了解这个主题:我们想要找到字符串“user1032613”的长度。这对计算机来说不是一件容易的事,因此我们很自然地会在后台线程中执行密集计算,以避免冻结应用程序。

    观察开启

    我们可以多次调用observeOn,它控制所有它下面的回调将运行哪个线程。它易于使用,并且可以按照您的预期工作。

    例如,我们会在主 UI 线程上显示一个进度条,然后在另一个线程中进行密集/阻塞操作,然后返回主 UI 线程更新结果:

        Observable.just("user1032613")
    
                .observeOn(mainThread) // set thread for operation 1
                .doOnNext {
                    /* operation 1 */
                    print("display progress bar")
                    progressBar.visibility = View.VISIBLE
                }
    
                .observeOn(backThread) // set thread for operation 2 and 3
                .map {
                    /* operation 2 */
                    print("calculating")
                    Thread.sleep(5000)
                    it.length
                }
    
                .doOnNext {
                    /* operation 3 */
                    print("finished calculating")
                }
    
                .observeOn(mainThread) // set thread for operation 4
                .doOnNext {
                    /* operation 4 */
                    print("hide progress bar and display result")
                    progressBar.visibility = View.GONE
                    resultTextView.text = "There're $it characters!"
                }
    
                .subscribe()
    

    在上面的例子中,/* operation 1 */mainThread 中运行,因为我们在其正上方使用observeOn(mainThread) 设置它;然后我们通过再次调用observeOn 切换到backThread,所以/* operation 2 */ 将在那里运行。因为我们在链接/* operation 3 */之前没有改变它,所以它也会在后面的线程中运行,就像/* operation 2 */一样;最后我们再次调用observeOn(mainThread),以确保/* operation 4 */从主线程更新UI。

    订阅开启

    所以我们学习了observeOn 为后续回调设置线程。我们还缺少什么?那么Observable本身,以及它的just()create()subscribe()等方法也是需要执行的代码。这就是对象沿流传递的方式。我们使用subscribeOn为与Observable本身相关的代码设置线程。

    如果我们删除所有回调(由前面讨论过的observeOn 控制),我们会留下“骨架代码”,默认情况下,它将在编写代码的任何线程(可能是主线程)上运行:

        Observable.just("user1032613")
                .observeOn(mainThread)
                .doOnNext {
                }
                .observeOn(backThread)
                .map {
                }
                .doOnNext {
                }
                .observeOn(mainThread)
                .doOnNext {
                }
                .subscribe()
    

    如果我们对在主线程上运行的这个空的骨架代码不满意,我们可以使用subscribeOn 来改变它。例如,第一行Observable.just("user1032613") 可能不像从我的用户名创建一个流那么简单——它可能是来自 Internet 的字符串,或者您可能正在使用 doOnSubscribe 进行其他一些密集操作。在这种情况下,您可以调用subscribeOn(backThread) 将部分代码放到另一个线程中。

    在哪里放置subscribeOn

    在写这个答案的时候,有一些误解,说“只调用一次”,“位置无关紧要”,“如果你多次调用它,只计算第一次”。经过大量研究和实验,事实证明subscribeOn 可以被多次调用。

    因为Observable 使用Builder Pattern(“一个接一个地链接方法”的花哨名称),所以subscribeOn 以相反的顺序应用。因此,该方法为上面的代码设置线程,与observeOn正好相反。

    我们可以使用doOnSubscribe 方法进行实验。该方法在订阅事件上触发,在subscribeOn设置的线程上运行:

        Observable.just("user1032613")
                .doOnSubscribe {
                    print("#3 running on main thread")
                }
                .subscribeOn(mainThread) // set thread for #3 and just()
                .doOnNext {
                }
                .map {
                }
                .doOnSubscribe {
                    print("#2 running on back thread")
                }
                .doOnNext {
                }
                .subscribeOn(backThread) // set thread for #2 above
                .doOnNext {
                }
                .doOnSubscribe {
                    print("#1 running on default thread")
                }
                .subscribe()
    

    如果您从下到上阅读上面的示例,可能会更容易遵循逻辑,就像Builder Pattern如何执行代码一样。

    在此示例中,第一行 Observable.just("user1032613")print("#3") 在同一线程中运行,因为它们之间没有更多的 subscribeOn。对于只关心just()create() 内部代码的人来说,这会产生“只有第一次调用很重要”的错觉。这个quickly falls apart once you start doing more


    脚注:

    为简洁起见,示例中的线程和print() 函数定义如下:

    val mainThread = AndroidSchedulers.mainThread()
    val backThread = Schedulers.computation()
    private fun print(msg: String) = Log.i("", "${Thread.currentThread().name}: $msg")
    

    【讨论】:

    • 我是在我的回答(同一个问题)中提到“subscribeOn() 只工作一次”的人。看了你的解释,好像很有道理。我可以用你的一些解释来更新我的答案吗?
    • 谢谢。一个后续问题,如果observeOn 控制下面它的代码,subscribeOn 控制它上面它的代码,如果它有observeOn,哪个线程将跟随代码块在它上面和subscribeOn 在它下面?抱歉,有点糊涂
    • @Mon subscribeOn 为框架“骨架”本身设置线程,observeOn 为回调函数/代码块设置线程。他们控制的不是同一件事。在您的情况下,“代码块”在 observeOn 设置的任何线程上运行。
    • 用示例代码得到了很好的响应。为了完整起见,您能否在subscribe() 中添加代码并解释它在哪个线程上运行以及subscribeOnobserveOn 对其有何影响?
    • 这不是我看到的 rxjava 3 的行为
    【解决方案4】:

    如果有人觉得 rx java 描述难以理解(比如我),这里是纯 java 的解释:

    subscribeOn()

    Observable.just("something")
      .subscribeOn(Schedulers.newThread())
      .subscribe(...);
    

    相当于:

    Observable observable = Observable.just("something");
    new Thread(() -> observable.subscribe(...)).start();
    

    因为Observablesubscribe() 上发出值,而这里subscribe() 进入单独的线程,所以这些值也在与subscribe() 相同的线程中发出。这就是为什么它在“上游”(影响先前操作的线程)和“下游”工作的原因。

    observeOn()

    Observable.just("something")
      .observeOn(Schedulers.newThread())
      .subscribe(...);
    

    相当于:

    Observable observable = Observable.just("something")
      .subscribe(it -> new Thread(() -> ...).start());
    

    这里Observable在主线程中发出值,只有监听器方法在单独的线程中执行。

    【讨论】:

      【解决方案5】:

      这个答案并不新鲜,我只是想再澄清一点。

      1. 假设我们有两个线程。

         val pool1 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 1") }
         val pool2 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 2") }
        

      1. 如答案所述,observeOn 将设置DownstreamsubscribeOn 将设置Upstream。但是如果两者都被使用呢?为了检查这一点,我逐行添加了日志。

        Observable.just("what if use both")
         .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe A " + Thread.currentThread().name) }
         .doOnNext { Log.d("Thread", "both, doOnNext A " + Thread.currentThread().name) }
         .map {
             Log.d("Thread", "both, map A " + Thread.currentThread().name)
             it + " A"
         }
        
         // observeOn
         .observeOn(Schedulers.from(pool1)) 
        
         .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe B " + Thread.currentThread().name) }
         .doOnNext { Log.d("Thread", "both, doOnNext B " + Thread.currentThread().name) }
         .map {
             Log.d("Thread", "both, map B " + Thread.currentThread().name)
             it + " B"
         }
        
         // subscribeOn
         .subscribeOn(Schedulers.from(pool2)) 
        
         .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe C " + Thread.currentThread().name) }
         .doOnNext { Log.d("Thread", "both, doOnNext C " + Thread.currentThread().name) }
         .map {
            Log.d("Thread", "both, map C " + Thread.currentThread().name)
            it + " C"
          }
        
         // observeOn main
         .observeOn(AndroidSchedulers.mainThread())
         .doOnNext { Log.d("Thread", "main " + Thread.currentThread().name) }
         .subscribe(
             { result -> Log.d("Thread", "main subscribe " + Thread.currentThread().name)}
             , { error -> {} }
             )
        

      结果是:

      both, doOnSubscribe C main
      both, doOnSubscribe A Thread 2
      both, doOnSubscribe B Thread 2
      
      both, doOnNext A Thread 2
      both, map A Thread 2
      
      both, doOnNext B Thread 1
      both, map B Thread 1
      
      both, doOnNext C Thread 1
      both, map C Thread 1
      
      main main
      main subscribe main
      result: what if use both A B C
      

      如您所见,doOnSubscribe 首先调用,从下到上。这意味着subscribe 的优先级高于其他运算符,因此处理第一个代码的第一个线程是Thread 2

      然后逐行调用其他运算符。在observeOn 之后,线程更改为Thread 1。然后,就在调用subscribe 之前,再次调用observeOn,将线程更改为主线程。 (别管AndroidScheduler,它只是一种调度器)

      TL;DR;

      • 第一个路径,subscribeOn 首先调用,从下到上。
      • 第二条路径,observeOn 调用,从上到下,连同其他代码。
      • RxJava2 和 RxJava3 的行为相同

      【讨论】:

        【解决方案6】:

        当您订阅 observable 时,会启动一个流程,该流程会一直向上运行到链的顶部,然后再次返回。订阅部分与上链相关,观察部分与下链相关。

        一旦到达链的顶端,订阅阶段就基本上完成了。开始发出事件并调用映射、过滤器等的下行链。

        SubscribeOn 会影响其位置上方的 subscription 调用,例如 doOnSubscribe。

        ObserveOn 影响其位置下方的观察调用,例如 doOnNext、map、flatmap 等。

        两者都会改变用于继续向上或向下流动的线程。

        import io.reactivex.Observable;
        import io.reactivex.schedulers.Schedulers;
        
        import java.util.concurrent.CountDownLatch;
        
        public class SubscribeVsObserveOn {
        
            public static void main(String[] args) throws InterruptedException {
        
                System.out.println("Ordinal 0: " + Thread.currentThread().getName());
        
                final CountDownLatch latch = new CountDownLatch(1);
        
                Observable
                    .just("a regular string.")
                    .doOnSubscribe(disposable ->
                        System.out.println("Ordinal 2: " + Thread.currentThread().getName()))
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(Schedulers.newThread())
                    .doOnNext(s ->
                        System.out.println("Ordinal 3: " + Thread.currentThread().getName()))
                    .map(s -> s)
                    .doOnSubscribe(disposable ->
                        System.out.println("Ordinal 1: " + Thread.currentThread().getName()))
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(Schedulers.newThread())
                    .doOnNext(s ->
                        System.out.println("Ordinal 4: " + Thread.currentThread().getName()))
                    .map(s -> s)
                    .subscribe(s -> latch.countDown());
        
                latch.await();
            }
        }
        

        这是输出:

        Ordinal 0: main
        Ordinal 1: RxNewThreadScheduler-1
        Ordinal 2: RxNewThreadScheduler-2
        Ordinal 3: RxNewThreadScheduler-3
        Ordinal 4: RxNewThreadScheduler-4
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2016-10-24
          • 2019-10-11
          • 1970-01-01
          • 2018-09-14
          • 2015-08-22
          • 1970-01-01
          • 1970-01-01
          • 2015-05-03
          相关资源
          最近更新 更多