RxJava 1 源码分析

准备工作:

最好有观察者模式基础,本文采用的源码是 rxjava 版本是 1.0.0

as倒入:

compile 'io.reactivex:rxjava:1.0.0'

一,示例代码:

Subscription tSubscription = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
                if(!subscriber.isUnsubscribed()){
                        subscriber.onNext("test");
                        subscriber.onCompleted();
                }
        }
}).subscribe(new Observer<String>() {
        @Override
        public void onCompleted() {
                System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {
                System.out.println("onNext:"+s);
        }
});



二,RxJava 1 五大基本元素

1.Observable (被观察者,观察得到的)
   是一个类
   通过Observable创建一个可观察的序列(create方法)
   通过subscribe去注册一个观察者
2.Observer  (观察者,用于接收数据)
   是一个接口 有 3个方法声明分别是onCompleted,onError,onNext
   作为Observable的subscribe方法的参数
3.Subscription  (订阅,用户描述被观察者和观察者之间的关系)
   是一个接口 有 2个方法声明分别是unsubscribe 解除订阅,inUnsubscribed 判断是不是解除订阅了
   用于取消订阅和获取当前的订阅状态。
4.OnSubscribe (当订阅时会触发此接口的掉用)
   是一个接口 实现了action1 接口,actiong1 里面只有一个方法 call()
   在Observable内部,实际作用是向订阅者发射数据。
5.Subscriber(实现了Observer和Subscription)
   是一个抽象类实现了Observer、Subscription 接口,并且实现了Subscription接口的方法 Observer接口方法未实现

三,源码分析

代码1  (以下依次计数)
Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
                if(!subscriber.isUnsubscribed()){
                        subscriber.onNext("test");
                        subscriber.onCompleted();
                }
        }
})
1.创建一个OnSubscribe对象传入Observable.create();
代码2:
public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}
2.hook.onCreate(f) 只是返回了 f 对象也就是1中创建的OnSubscribe对象,即使创建被观察者对象 Observable 的入参f ,并且将1中创建的OnSubscribe对象在Observable 的构造函数中赋值给this.onSubscribe成员变量
代码3:
public class Observable<T> {

    final OnSubscribe<T> onSubscribe;

    /**
     * Creates an Observable with a Function to execute when it is subscribed to.
     * <p>
     * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
     * unless you specifically have a need for inheritance.
     * 
     * @param f
     *            {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
     */
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }



3. 调用2中创建的被观察者对象的subscribe(参数为观察者对象 Observer)方法并且传入观察者Observer 对象

}).subscribe(new Observer<String>() {
        @Override
        public void onCompleted() {
                System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {
                System.out.println("onNext:"+s);
        }
});
4. 包装观察者observer对象为SubScriber对象 ,此SubScriber相当于observer的代理对象,并将代理对象传入
     subscribe(参数为观察者对象 Observer的代理对象SubScriber)方法。
public final Subscription subscribe(final Observer<? super T> observer) {
    return subscribe(new Subscriber<T>() {

        @Override
        public void onCompleted() {
            observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }

    });
}


5.  subscribe方法(参数为subscriber)代码如下
 */
public final Subscription subscribe(Subscriber<? super T> subscriber) {
    // validate and proceed
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
        /*
         * the subscribe function can also be overridden but generally that's not the appropriate approach
         * so I won't mention that in the exception
         */
    }
    
    // new Subscriber so onStart it
    subscriber.onStart();
    
    /*
     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
     * to user code from within an Observer"
     */
    // if not already wrapped
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
    try {
        // allow the hook to intercept and/or decorate
        hook.onSubscribeStart(this, onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        // special handling for certain Throwable/Error/Exception types
        Exceptions.throwIfFatal(e);
        // if an unhandled error occurs executing the onSubscribe we will propagate it
        try {
            subscriber.onError(hook.onSubscribeError(e));
        } catch (OnErrorNotImplementedException e2) {
            // special handling when onError is not implemented ... we just rethrow
            throw e2;
        } catch (Throwable e2) {
            // if this happens it means the onError itself failed (perhaps an invalid function implementation)
            // so we are unable to propagate the error correctly and will just throw
            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
            // TODO could the hook be the cause of the error in the on error handling.
            hook.onSubscribeError(r);
            // TODO why aren't we throwing the hook's return value.
            throw r;
        }
        return Subscriptions.empty();
    }
}

hook.onSubscribeStart(this,onSubscribe) 方面只是返回了2中的 this.onSubscribe 成员变量即1中创建的 
OnSubscribe 对象,所以hook.onSubscribeStart(this,onSubscribe).call(SubScriber) 相对于调用了代码1中我们自己写call 方法 并且传入了观察者对象Observer 的代理对象,
call方中调用了观察者对象Observer 的代理对象的onNext和onCompleted 方法如下:
                        subscriber.onNext("test");
                        subscriber.onCompleted();

即调用到代码4中我们自己定义的观察者对象Observer的方法



四,UML图:

RxJava 源码分析 (RxJava 1 ,RxJava 2)

五,总结

   最后示例代码抽象可以理解为,创建了一个被观察者对象,并且传入了观察者对象,然后在被观察者对象中调用了观察者对象方法,是一个观察者模式的实现。

            



                                                           

                       RxJava 2 源码分析


二,RxJava 2 五大基本元素(无背压)

1.Observable  (被观察者,不支持背压)

   一个抽象类

  通过Observable创建一个可观察的序列(create方法)

  通过subscribe去注册一个观察者

2.Observer  (观察者,用于接收数据)

   是一个接口,相比 RxJava 1 新增了一个onSubscribe 方法声明 

   作为Observable的subscribe方法的参数

3.Disposable 

   是一个接口,和RxJava 1 中的 Subscription(订阅) 的作用相当

   用户取消订阅和获取当前订阅状态

4. ObservableOnSubscribe    

   作用相当于 RxJava 1  OnSubscribe ,只是独立出来了,当订阅的时候会触发此接口

   在Observable内部,实际作用是向观察者发射数据

5.Emitter 
   是一个发射数据的接口,和Observer的方法类似

   本质是对Observer 和 Subscriber的包装


三,源码分析




RxJava 2 五大基本元素(有背压)

1.Flowable(被观察者,支持背压,易流动的)

  通过Flowable创建一个可观察的序列(create方法)

   通过subscribe去注册一个观察者

2.subscriber

  一个单独的接口,和Observer的方法类似

  作为flowable的subscribe的方法参数

3.Subscription 

  订阅,和RxJava 1 的不同,支持背压,有用于背压的request方法

4.FlowableOnSubscribe

   当订阅的时候会触发此接口调用,在Flowable内部,实际作用是向观察者发射数据

5.Emitter

   一个发射数据的接口,和Observer 的方法类似,是对Observer 和 subscriber的包装

UML图:

RxJava 源码分析 (RxJava 1 ,RxJava 2)


RxJava 源码分析 (RxJava 1 ,RxJava 2)


                        Scheduler调度者源码分析

1.Scheduler抽象类

2.Worker   --- 真正做线程调度的类

3.Action0  --- 在线程中执行的操作

4.schedule --- 实际做线程调度的方法,入参为Action0


未完待续.....





相关文章:

  • 2021-10-11
  • 2021-11-30
  • 2021-07-14
  • 2022-12-23
  • 2022-12-23
  • 2021-08-17
  • 2021-10-21
  • 2021-09-10
猜你喜欢
  • 2021-08-10
  • 2022-12-23
  • 2021-10-31
  • 2022-12-23
  • 2021-09-23
  • 2021-11-11
  • 2021-06-25
相关资源
相似解决方案