初学RxJava,大概花了差不多一周的时间,看了两本书《Java8 函数式编程》和沈哲的《RxJava 2.x 实战》。第二本没看完看了前10章,我觉得第二本书一般般。书里有些重要的概念根本没讲清楚。后面还是要多看一些大牛的博客。
我这里面主要是记录一下最简单的RxJava发送和接收数据的流程。对支持背压的Flowable不在这一节作分析,这里只分析Observable和Observer,也不涉及线程切换,对应的RxJava版本是2.2.8
目标:
1.知道Rxjava怎么把数据发送出去
2.知道RxJava怎么把数据接收过来
我先贴一段代码
以上是一个标准的Observable发送数据流,Observer接收数据流的过程。
先从Create方法看起;
可以看到create方法里面有个RxJavaPlugins.onAssembly方法,再点进去看一下
我们来看一下上面的这个方法,首先不管这个onObservableAssembly是什么,整个的函数调用其实就是返回了一个source。因为onObservableAssembly初始的时候为空,这个source是一个Observable类型的。因为Observable就是一个接口,所以这个source具体是什么类型呢?回到create方法里面,发现具体的类型是ObservableCreate类型。看一下这个ObservableCreate:
截了一小段比较重要的代码。我们可以发现这个ObservableCreate 它其实是继承自Observable,所以可以把它看做是一个Observable对象。我们看一下它的构造,它的构造要传的是一个ObservableOnSubscribe 类型的对象,看一下这个ObservableOnSubscribe
这个ObservableOnSubscribe是一个接口,它要实现的一个方法就是subscribe.
好现在我们在回到create方法中,看看这个ObservableOnSubscribe对象是怎么传过来的。
显然这个ObservableOnSubscribe对象就是我们在调用Observable的create方法的时候new 出来的。
以防忘记再截一下代码。
好了到了这一步,我们先来总结一下Observable的create方法到底做了什么?
首先create方法他是创建一个Observabe,它的目的是返回一个Observable对象。Observable是一个接口,是吧,这个Observable的具体的对象其实是ObservableCreate,他是实现了Observable的。那这个ObservableCreate对象我们是怎么得到的,我们是通过传入我们new 出来的这个ObservableOnSubscribe,把它作为ObservableCreate的构造参数构造出来的。
好了整个的Observable的create过程我们已经清楚了。好,现在我们看一下最开始我们给出的例子中的subscribe方法。subscribe的对象是一个Observer。就是一个观察者,这个Observer也是一个接口,它里面构造了四个方法,我不细说了,分别是
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
这四个方法我们很熟悉了,不做过多解释。
好现在回到正题。我们看一下这个subscribe方法,点进去看一下实现。
红色三角形标记处的代码可以忽略,点进去会发现其实它什么都没干。主要是这个subscribeActual(observer)这个方法,点进去看一下,发现他是Observable的一个抽象方法,那具体的实现在哪里呢?我们回过头来想一下,刚才我们分析create方法的时候说create方法返回的是一个Observable对象,具体的对象类型其实是ObservableCreate,他是实现了Observable的,好我们看一下这个ObservableCreate里面有没有这个subscribeActual(Observer)方法。其实刚才已经截过代码了,省的回过头再看了,我再截一遍
看一下这个subscribeActual方法。
这里面首先出现了一个CreateEmitter对象,看一下这个对象的源码。
这个CreateEmitter继承了ObservableEmitter,同时也继承了Disposable,想想这个ObservableEmitter是不是很眼熟,是的,回到我们上面的例子看一下create的调用,ObservableSubscribe对象里面subscribe方法里面的参数就是一个ObservableEmitter。
好了看一下,subscribeActual后面的两个方法调用。
observer.onScribe(parent) 显然这个方法调用就是对应我们一开始给的例子里面,Observer会调用onScribe(Disposable disposable)方法,这也解释了为什么RxJava在发送数据流,执行订阅方法的时候,候最先被回调的是Observer的onScribe方法。
接下来看一下
source.onSubscribe(parent) 这里面的source就是Observable对象,这个方法对应到我们例子里面的 new ObservableOnSubscribe()里面的onSubscribe(ObservableEmitter emitter)这个方法。因为Createmitter既是一个ObservableEmitter也是一个Disposable对象
好了现在重新整理一下,subscribe observer的做了什么事情。
这里面的subscribe是谁调用的呢?是前面分析的实现了Observable接口的ObservableCreate对象调用的。他直接调用了自己的subscribeActual方法。整个方法里面做了什么事情?第一,首先回调了Observer的onSubscribe方法,然后调用Observable(记住就是代码里面的source,也就是一个ObservableCreate对象)的subscribe(ObservableEmitter emitter)方法,我们要发送的数据流就是在这个回调中进行。假如我们现在要发送消息了,就如同例子中的一样发送一个“Hello”,此时就会调用emitter的onNext方法,这个emitter就是上面分析的Createmitter方法。这个emitter对象,就是之前Observer所注册的回调对象。
好了整个的过程就是这样的。从create一个Observable到subscribe一个Observer ,RxJava的适配器模式用的很多。文字表达有所欠缺,这里只做整理,方面后续学习。
参考:https://blog.csdn.net/zxt0601/article/details/61614799