【问题标题】:Which thread does subscriber's onNext() method run on?订阅者的 onNext() 方法在哪个线程上运行?
【发布时间】:2016-02-23 10:33:13
【问题描述】:

我有一个创建新 Observable 的用例:

Observable.create(new Observable.OnSubscribe<String>() {
   @Override
   public void call(Subscriber<? super String> subscriber) {
      final RequestFuture<String> futureRequest = RequestFuture.newFuture();
      try{
         //getResult() is a sync time consuming http connection
         String response = getResult();
         subscriber.onNext( response );
         subscriber.onCompleted();
         Log.e("call method","Thread is about to end" + Thread.currentThread().getId());
      }
      catch ( Exception e){
         subscriber.onError( e );
      }

   }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

如您所见,getResult() 方法是一个耗时的 http 调用,并且我以同步的方式执行此操作。我在 io 线程上订阅它并在 Android 的主线程上观察。

当我检索 observable 并订阅它时:

subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        Log.e("on completed method", "Thread is ending "+ Thread.currentThread().getId());

                    }

                    @Override
                    public void onError(Throwable e) {
                        Toast.makeText( getContext(), "failed!",Toast.LENGTH_LONG ).show();
                    }

                    @Override
                    public void onNext(String s) {
                        Log.e("on Next method", "Thread is about to end" + Thread.currentThread().getId());

                    }
                }); 

奇怪的是我可以看到 call() 方法在其 id 为 372 的线程上运行,而订阅者的 onNext() 方法在线程 1 上运行,我认为它是来自 android 的 UiThread。

那么实际上 Rxjava 是如何处理这个线程变化的呢?并证明 call 方法中的订阅者不是 subscribe() 方法中使用的那个?

【问题讨论】:

  • onNext() 运行在 UiThread 我想,这个 onNext() 方法的实现在哪里?
  • 从您的实现中,callSchedulers.io() 线程之一上完成,onNext 在主线程上完成。不同的订阅者是什么意思?
  • “subscriber.onNext(response);”在 call() 和 subscribe() 方法中的订阅者,它们是不同的对象吗?
  • 它们是,因为您传递给subscribe() 的订阅者被包裹在SafeSubscriber 中。如果您将subscriber 中的call() 转换为SafeSubscriber 并在其上调用SafeSubscriber#getActual,您将获得原始对象。

标签: android multithreading rx-java rx-android


【解决方案1】:

默认情况下,订阅者的 onNext 将在您调用 .subscribe 的同一线程上运行。

在您的代码中,您有两个线程更改:

第一个是 subscribeOn(IO),它将在 IO 中运行 .create 和链的其余部分。 之后你有observeOn(mainThread),它将把以下所有操作都更改为mainThread,包括.subscribe()中的最终方法。

希望这能让您更好地了解这两个运算符的工作原理:https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2#.t6uagyarn

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多