RxJava 1 源码分析
准备工作:
最好有观察者模式基础,本文采用的源码是 rxjava 版本是 1.0.0
as倒入:
compile 'io.reactivex:rxjava:1.0.0'
Subscription tSubscription = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if(!subscriber.isUnsubscribed()){ subscriber.onNext("test"); subscriber.onCompleted(); } } }).subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext:"+s); } });
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if(!subscriber.isUnsubscribed()){
subscriber.onNext("test");
subscriber.onCompleted();
}
}
})
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
public class Observable<T> { final OnSubscribe<T> onSubscribe; /** * Creates an Observable with a Function to execute when it is subscribed to. * <p> * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor, * unless you specifically have a need for inheritance. * * @param f * {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called */ protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("onNext:"+s);
}
});
4. 包装观察者observer对象为SubScriber对象 ,此SubScriber相当于observer的代理对象,并将代理对象传入
public final Subscription subscribe(final Observer<? super T> observer) { return subscribe(new Subscriber<T>() { @Override public void onCompleted() { observer.onCompleted(); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onNext(T t) { observer.onNext(t); } }); }
*/
public final Subscription subscribe(Subscriber<? super T> subscriber) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.empty();
}
}
subscriber.onNext("test");
subscriber.onCompleted();
RxJava 2 源码分析
二,RxJava 2 五大基本元素(无背压)
一个抽象类
通过Observable创建一个可观察的序列(create方法)
通过subscribe去注册一个观察者
是一个接口,相比 RxJava 1 新增了一个onSubscribe 方法声明
作为Observable的subscribe方法的参数
是一个接口,和RxJava 1 中的 Subscription(订阅) 的作用相当
用户取消订阅和获取当前订阅状态
作用相当于 RxJava 1 OnSubscribe ,只是独立出来了,当订阅的时候会触发此接口
在Observable内部,实际作用是向观察者发射数据
本质是对Observer 和 Subscriber的包装
三,源码分析
RxJava 2 五大基本元素(有背压)
1.Flowable(被观察者,支持背压,易流动的)
通过Flowable创建一个可观察的序列(create方法)
通过subscribe去注册一个观察者
2.subscriber
一个单独的接口,和Observer的方法类似
作为flowable的subscribe的方法参数
3.Subscription
订阅,和RxJava 1 的不同,支持背压,有用于背压的request方法
4.FlowableOnSubscribe
当订阅的时候会触发此接口调用,在Flowable内部,实际作用是向观察者发射数据
5.Emitter
一个发射数据的接口,和Observer 的方法类似,是对Observer 和 subscriber的包装
UML图:
Scheduler调度者源码分析
1.Scheduler抽象类
2.Worker --- 真正做线程调度的类
3.Action0 --- 在线程中执行的操作
4.schedule --- 实际做线程调度的方法,入参为Action0
未完待续.....