RxJava变换

终于要到牛逼的地方了,不管你激动不激动,反正我是激动了

    RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或者对象

这里我们看一个例子:

Observable.just("xxx.img")
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String s) {
                return null;
            }
        }).subscribe((new Action1<Bitmap>() {
    @Override
    public void call(Bitmap bitmap) {
        System.out.print(bitmap);
    }}));

上面的代码中出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。

        Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法

   可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。

不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。我列举几个常用的变换:

1)map(): 事件对象的直接变换,具体功能上面已经介绍过。它是 RxJava 最常用的变换。 map() 的示意图:

RxJava源碼分析三:RxJava变换

接下来是flatMap(): 这是一个很有用但非常难理解的变换 

首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单,直接使用map方法打印就可以了。

那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程,有点类似于数据库里面的关联表)首先可以这样实现:

  Subscriber<Student> subscriber = new Subscriber<Student>() {
...
          @Override
          public void onNext(Student student) {
              List<Course> courses = student.getCourses();
              for (int i = 0; i < courses.size(); i++) {
                  Course course = courses.get(i);
                  Log.d(tag, course.getName());
              }
          }
      };

      Observable.from(students)
              .subscribe(subscriber);

我们可以在观察者(订阅者)接收数据的地方将数据取出然后进行操作

好像依然很简单。但是注意上面的subscriber接收的依然是student对象,如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)

 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化

这个时候就需要flatMap闪亮登场了,我们可以做如下工作:

Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }};

Observable.from(students)
        .flatMap(new Func1<Student, Observable<Course>>() {
            @Override
            public Observable<Course> call(Student student) {
                return Observable.from(student.getCourses());
            }
        })
        .subscribe(subscriber);

从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。

但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中

flatMap() 的原理是这样的:

    1. 使用传入的事件对象创建一个 Observable 对象;

    2. 并不发送这个Observable, 而是将它**,于是它开始发送事件;

    3. 每一个创建出来的Observable 发送的事件,都被汇入同一个Observable,而这个Observable负责将这些事件统一交给Subscriber的回调方法

这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发下去。而这个铺平就是 flatMap() 所谓的 flat

RxJava源碼分析三:RxJava变换

扩展:由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):

 
 networkClient.token()

// 在订阅时请求 token,并在响送 token
                .flatMap(new Func1<String, Observable<Messages>>() {
                    @Override
                    public Observable<Messages> call(String token) {
                        return networkClient.messages();
                    }})
                .subscribe(new Action1<Messages>() {
                    @Override
                    public void call(Messages messages) {
                        // 示消息列表
                        showMessages(messages);

                    }});

  传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰

通过上面的介绍有没有感觉到RxJava变换的神奇?接下来我们就一起来看看RxJava是如何实现这种变换的


变换的原理:lift()

这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):


public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {

        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);

        }});}

注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码

注意上面的代码,我们来分析一下:

1)首先我们看到当调用Observable的lift方法的时候,返回的是一个新创建的Observable

2)然后在这个新的Observable里面我们在实现的OnSubscribe的call方法里面创建了一个新的Subscriber,并且调用了原来的Observable的onSubscribe的call方法

3)看到这里是不是有点像装饰器模式,因为这里实现的主要的功能就是在一个对象执行其方法的时候插入了一些新的逻辑,(这个对象就是onSubscribe)

也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会创建一个新的 Observable,这个新的 Observable 会像一个装饰器一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。

当然新版本的具体实现代码发生了变化但是,实现逻辑没有变化,就是对装饰器模式的有效应用(或许也有闭包的影子),效果如下:

RxJava源碼分析三:RxJava变换

例如我们可以直接使用lift操作对原有数据进行变化:

 
observable.lift(new Observable.Operator<String, Integer>() {
     @Override
     public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {

         // 将事件序列中的 Integer 转换为 String          return new Subscriber<Integer>() {
             @Override
             public void onNext(Integer integer) {
                 subscriber.onNext("" + integer);
             }

........
         };}});


在上面的代码中我们将事件中的 Integer 对象转换成 String

注意:讲述 lift() 的原理只是为了让你更好地了解 RxJava ,从而可以更好地使用它

然而不管你是否理解了 lift() 的原理,RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误


线程控制:Scheduler (二)

除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。

之前讲到了,可以利用 subscribeOn() 结合 observeOn() 来实现线程控制,subscribeOn指定事件产生的线程,observeOn指定观察者接收到事件的线程

可是在了解了 map() flatMap() 等变换方法后,我们就产生了疑问:能不能多切换几次线程?

答案是:能

其实observeOn() 指定的是它之后的操作所在的线程

因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。上代码:


Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .map(mapOperator) // 新线程,由 observeOn() 指定
        .observeOn(Schedulers.io())
        .map(mapOperator2) // IO 线程,由 observeOn() 指定
        .observeOn(AndroidSchedulers.mainThread)
        .subscribe(subscriber);  // An

如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。

但是需要注意了,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的

Scheduler 的原理(二)

其实, subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程)

因为subscribeOn方法也是创建了一个新的observable对象,然后使用一个新的SubscribeOn对象来装饰原observable对象中的SubscribeOn对象

RxJava源碼分析三:RxJava变换

observeOn() 原理图:

RxJava源碼分析三:RxJava变换

从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 "schedule..." 部位)

  不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响,也就是影响事件产生的地方

  而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时因此observeOn() 控制的是它后面的线程

最后,一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的:

RxJava源碼分析三:RxJava变换


图中共有 5 处含有对事件的操作。由图中可以看出,

    (1)①和②两处受第一个 subscribeOn() 影响,运行在红色线程;

    (2)③和④处受第一个 observeOn() 的影响,运行在绿色线程;

    (3)⑤处受第二个 onserveOn() 影响,运行在紫色线程;

而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用

 

延伸:doOnSubscribe()

  默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程

  例如:当我们进行网络请求开始之前需要在界面上显示一个ProgressBar,这必须在主线程执行,我们就可以使用这个方法


Observable.create(onSubscribe)
        .subscribeOn(Schedulers.io())//指定subscribe方法线        .doOnSubscribe(new Action0() {
            
            @Override
            public void call() {
                progressBar.setVisibility(View.VISIBLE); //在主线            }  })
        .subscribeOn(AndroidSchedulers.mainThread())//指定主线        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);


如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了


到此为止我们已经对Rxjava的变换有了了解,并且对RxJava的线程调度原理再次进行了说明

RxJava变换

终于要到牛逼的地方了,不管你激动不激动,反正我是激动了

    RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或者对象

这里我们看一个例子:

Observable.just("xxx.img")
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String s) {
                return null;
            }
        }).subscribe((new Action1<Bitmap>() {
    @Override
    public void call(Bitmap bitmap) {
        System.out.print(bitmap);
    }}));

上面的代码中出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。

        Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法

   可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。

不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。我列举几个常用的变换:

1)map(): 事件对象的直接变换,具体功能上面已经介绍过。它是 RxJava 最常用的变换。 map() 的示意图:

RxJava源碼分析三:RxJava变换

接下来是flatMap(): 这是一个很有用但非常难理解的变换 

首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单,直接使用map方法打印就可以了。

那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程,有点类似于数据库里面的关联表)首先可以这样实现:

  Subscriber<Student> subscriber = new Subscriber<Student>() {
...
          @Override
          public void onNext(Student student) {
              List<Course> courses = student.getCourses();
              for (int i = 0; i < courses.size(); i++) {
                  Course course = courses.get(i);
                  Log.d(tag, course.getName());
              }
          }
      };

      Observable.from(students)
              .subscribe(subscriber);

我们可以在观察者(订阅者)接收数据的地方将数据取出然后进行操作

好像依然很简单。但是注意上面的subscriber接收的依然是student对象,如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)

 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化

这个时候就需要flatMap闪亮登场了,我们可以做如下工作:

Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }};

Observable.from(students)
        .flatMap(new Func1<Student, Observable<Course>>() {
            @Override
            public Observable<Course> call(Student student) {
                return Observable.from(student.getCourses());
            }
        })
        .subscribe(subscriber);

从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。

但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中

flatMap() 的原理是这样的:

    1. 使用传入的事件对象创建一个 Observable 对象;

    2. 并不发送这个Observable, 而是将它**,于是它开始发送事件;

    3. 每一个创建出来的Observable 发送的事件,都被汇入同一个Observable,而这个Observable负责将这些事件统一交给Subscriber的回调方法

这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发下去。而这个铺平就是 flatMap() 所谓的 flat

RxJava源碼分析三:RxJava变换

扩展:由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):

 
 networkClient.token()

// 在订阅时请求 token,并在响送 token
                .flatMap(new Func1<String, Observable<Messages>>() {
                    @Override
                    public Observable<Messages> call(String token) {
                        return networkClient.messages();
                    }})
                .subscribe(new Action1<Messages>() {
                    @Override
                    public void call(Messages messages) {
                        // 示消息列表
                        showMessages(messages);

                    }});

  传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰

通过上面的介绍有没有感觉到RxJava变换的神奇?接下来我们就一起来看看RxJava是如何实现这种变换的


变换的原理:lift()

这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):


public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {

        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);

        }});}

注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码

注意上面的代码,我们来分析一下:

1)首先我们看到当调用Observable的lift方法的时候,返回的是一个新创建的Observable

2)然后在这个新的Observable里面我们在实现的OnSubscribe的call方法里面创建了一个新的Subscriber,并且调用了原来的Observable的onSubscribe的call方法

3)看到这里是不是有点像装饰器模式,因为这里实现的主要的功能就是在一个对象执行其方法的时候插入了一些新的逻辑,(这个对象就是onSubscribe)

也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会创建一个新的 Observable,这个新的 Observable 会像一个装饰器一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。

当然新版本的具体实现代码发生了变化但是,实现逻辑没有变化,就是对装饰器模式的有效应用(或许也有闭包的影子),效果如下:

RxJava源碼分析三:RxJava变换

例如我们可以直接使用lift操作对原有数据进行变化:

 
observable.lift(new Observable.Operator<String, Integer>() {
     @Override
     public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {

         // 将事件序列中的 Integer 转换为 String          return new Subscriber<Integer>() {
             @Override
             public void onNext(Integer integer) {
                 subscriber.onNext("" + integer);
             }

........
         };}});


在上面的代码中我们将事件中的 Integer 对象转换成 String

注意:讲述 lift() 的原理只是为了让你更好地了解 RxJava ,从而可以更好地使用它

然而不管你是否理解了 lift() 的原理,RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误


线程控制:Scheduler (二)

除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。

之前讲到了,可以利用 subscribeOn() 结合 observeOn() 来实现线程控制,subscribeOn指定事件产生的线程,observeOn指定观察者接收到事件的线程

可是在了解了 map() flatMap() 等变换方法后,我们就产生了疑问:能不能多切换几次线程?

答案是:能

其实observeOn() 指定的是它之后的操作所在的线程

因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。上代码:


Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .map(mapOperator) // 新线程,由 observeOn() 指定
        .observeOn(Schedulers.io())
        .map(mapOperator2) // IO 线程,由 observeOn() 指定
        .observeOn(AndroidSchedulers.mainThread)
        .subscribe(subscriber);  // An

如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。

但是需要注意了,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的

Scheduler 的原理(二)

其实, subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程)

因为subscribeOn方法也是创建了一个新的observable对象,然后使用一个新的SubscribeOn对象来装饰原observable对象中的SubscribeOn对象

RxJava源碼分析三:RxJava变换

observeOn() 原理图:

RxJava源碼分析三:RxJava变换

从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 "schedule..." 部位)

  不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响,也就是影响事件产生的地方

  而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时因此observeOn() 控制的是它后面的线程

最后,一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的:

RxJava源碼分析三:RxJava变换


图中共有 5 处含有对事件的操作。由图中可以看出,

    (1)①和②两处受第一个 subscribeOn() 影响,运行在红色线程;

    (2)③和④处受第一个 observeOn() 的影响,运行在绿色线程;

    (3)⑤处受第二个 onserveOn() 影响,运行在紫色线程;

而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用

 

延伸:doOnSubscribe()

  默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程

  例如:当我们进行网络请求开始之前需要在界面上显示一个ProgressBar,这必须在主线程执行,我们就可以使用这个方法


Observable.create(onSubscribe)
        .subscribeOn(Schedulers.io())//指定subscribe方法线        .doOnSubscribe(new Action0() {
            
            @Override
            public void call() {
                progressBar.setVisibility(View.VISIBLE); //在主线            }  })
        .subscribeOn(AndroidSchedulers.mainThread())//指定主线        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);


如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了


到此为止我们已经对Rxjava的变换有了了解,并且对RxJava的线程调度原理再次进行了说明

相关文章:

  • 2021-09-20
  • 2021-11-18
  • 2021-04-14
  • 2019-08-30
  • 2021-11-24
  • 2021-06-03
  • 2021-11-28
  • 2021-09-15
猜你喜欢
  • 2022-01-13
  • 2021-11-11
  • 2021-06-25
  • 2021-10-11
  • 2021-11-30
  • 2021-09-23
  • 2021-08-17
  • 2021-10-31
相关资源
相似解决方案