Rxjava的创建操作符

操作符 用途
just() 将一个或多个对象转换成发射这个或者这些对象的一个Observable
from() 将一个Iterable、一个Future或者一个数组转换成一个Observable
create() 使用一个函数从头创建一个Obervable
defer() 只有当订阅者订阅才创建Observable,为每个订阅者创建一个新的Observab
range() 创建一个发射指定范围的整数序列的Observable
interval() 创建一个按照给定的时间间隔发射整数序列的Observavble
timer() 创建一个在给定的延时之后发射单个数据的Observable
empty() 创建一个什么都不做直接通知完成的Observable
error() 创建一个什么都不做直接通知错误的Observable
nerver() 创建一个不发射任何数据的Observable

1.create、just和from

1.1 create

使用一个函数从头开始创建一个Obervable

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i <10 ; i++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.out.println(throwable.getMessage());
                throwable.printStackTrace();
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("onComplete");
            }
        });

执行结果:

0
1
2
3
4
5
6
7
8
9
onComplete

1.2 just

创建一个发射指定值的Observable。

发射单个数据:

   Observable.just("Hello World")
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println(s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    throwable.printStackTrace();
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });

执行结果:

Hello World
onComplete

just类似于from,但是from会将数组或Iterable的数据取出然后逐个发射,而just只是简单地原样发射,将数组或Iterable当单个参数。

它可以接受一至十个参数,返回一个按参数列表顺序发射这些数据的Observable。

    Observable.just(1,2,3,4,5,6,7,8,9,10)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println(integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println(throwable.getMessage());
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });

执行结果:

1
2
3
4
5
6
7
8
9
10
onComplete

在Rxjava2.0中,如果在just()中传入null,则会抛出一个空指针异常。

 java.lang.NullPointerException: item is null

1.3 from

from可以将其他种类的对象和数据类型转换为Observable

在Rxjava中,from操作符可以将Future、Iterable和数组转换成Observable。对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。

1.数组:fromArray(T... items)

    Observable.fromArray("hello","world")
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println(s);
                }
            });
执行结果:
hello
world

2.Iterable:fromIterable(Iterable<? extends T> source)

    Observable.fromIterable(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println(integer);
                }
            });
            
执行结果:
1
2
3
4
5
6
7
8
9
10

3.fromFuture(Future<? extends T> future)

对于Future,他会发射Future.get()方法返回的数据

        ExecutorService executorService= Executors.newCachedThreadPool();
        Future<String> future=executorService.submit(new MyCallable());

        Observable.fromFuture(future)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });
                
                
    static class MyCallable implements Callable<String>{

        @Override
        public String call() throws Exception {
            System.out.println("模拟一些耗时的任务...");
            Thread.sleep(5000);
            return "OK";
        }
    }
    }        
       
       
       
       执行结果:
        模拟一些耗时的任务...
	    OK

4. fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)

指定超时时长和时间单位。如果过了指定的时长,Future还没有返回一个值,那么这个Observable就会发射错误通知并终止。

        ExecutorService executorService=Executors.newCachedThreadPool();
        Future<String> future=executorService.submit(new MyCallable());

        Observable.fromFuture(future,4,TimeUnit.SECONDS)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        throwable.printStackTrace();
                        System.out.println(throwable.getMessage());
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("onComplete");
                    }
                });
                
		执行结果:
        模拟一些耗时的任务...
		java.util.concurrent.TimeoutException...
		null
        

2.repeat

repeat会重复发射数据。 repeat不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列或者是无限的,或者是通过repeat(n)指定的重复次数。

1.repeat(long times)

        Observable.just("hello repeat")
                .repeat(3)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });
         执行结果:
         	hello repeat
			hello repeat
			hello repeat
		                	

2.repeatWhen

repeatWhen不是缓存和重放原始Observable的数据序列,而是有条件地重复订阅和发射原来的Observable.

将原始Observable的终止通知(完成或错误)当作一个void数据传递给一个通知处理器,以此来决定是否要重新订阅和发射原来的observable.这个通知处理器就像一个Observable操作符,接受一个发射void通知的Observable(意思是,重新订阅和发射原始Observable)作为输入,返回一个发射void数据或者直接终止(既使用repeatwhen终止发射数据)的Observable。

当apply方法返回的是Observable.empty(),Observable.error()。则不发送事件。

        Observable.range(0,9)
                .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                        return objectObservable.timer(10,TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {

                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("onComplete");
                    }
                });

        try {
            Thread.sleep(12000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
     	执行结果:
      
        0
        1
        2
        3
        4
        5
        6
        7
        8
        0
        1
        2
        3
        4
        5
        6
        7
        8
        onComplete

3. repeatUntil

repeatUntil表示直到某个条件就不再重复发射数据。当BooleanSupplier的getAsBoolean()返回false时,表示重复发射上游的Observable;当getAsBoolean()为true时,表示中止重复发射上游的Observab。

        long startTimeMillis=System.currentTimeMillis();
        Observable.interval(500, TimeUnit.MILLISECONDS)
                .take(5)
                .repeatUntil(new BooleanSupplier() {
                    @Override
                    public boolean getAsBoolean() throws Exception {
                        Log.e(TAG, "getAsBoolean: time:"+(System.currentTimeMillis()-startTimeMillis ));
                        return (System.currentTimeMillis()-startTimeMillis)>5000;
                    }
                }).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG, "accept: "+aLong );
            }
        });
        
        
        执行结果:
        2019-11-13 13:42:36.672 15936-15976/com.loan.rxjavademo E/MainActivity: accept: 0
        2019-11-13 13:42:37.172 15936-15976/com.loan.rxjavademo E/MainActivity: accept: 1
        2019-11-13 13:42:37.673 15936-15976/com.loan.rxjavademo E/MainActivity: accept: 2
        2019-11-13 13:42:38.172 15936-15976/com.loan.rxjavademo E/MainActivity: accept: 3
        2019-11-13 13:42:38.673 15936-15976/com.loan.rxjavademo E/MainActivity: accept: 4
        2019-11-13 13:42:38.673 15936-15976/com.loan.rxjavademo E/MainActivity: getAsBoolean: time:2513
        2019-11-13 13:42:39.175 15936-16023/com.loan.rxjavademo E/MainActivity: accept: 0
        2019-11-13 13:42:39.676 15936-16023/com.loan.rxjavademo E/MainActivity: accept: 1
        2019-11-13 13:42:40.175 15936-16023/com.loan.rxjavademo E/MainActivity: accept: 2
        2019-11-13 13:42:40.675 15936-16023/com.loan.rxjavademo E/MainActivity: accept: 3
        2019-11-13 13:42:41.175 15936-16023/com.loan.rxjavademo E/MainActivity: accept: 4
        2019-11-13 13:42:41.175 15936-16023/com.loan.rxjavademo E/MainActivity: getAsBoolean: time:5015

3. defer、interval和timer

3.1 defer

直到有观察者订阅时才创建Observable,并且为每个观察者创建一个全新的Observable。

defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,但事实上每个订阅者获取的是他们自己单独的数据序列。

           static int i=10;
    public static void main(String[] args) {
        Observable observable=Observable.defer(new Supplier<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> get() throws Throwable {
                return Observable.just(i);
            }
        });
        i=20;
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                System.out.println(integer);
            }
        });
    }
        
        执行结果:
            20

3.2 interval

创建一个按固定时间间隔发射整数序列的Observable。

interval操作符返回一个Observable,它按固定的时间间隔发射一个无限递增的整数序列。

interval操作符默认在computation调度器上执行。

       Observable.interval(1,TimeUnit.SECONDS)
               .subscribe(new Consumer<Long>() {
                   @Override
                   public void accept(Long aLong) throws Exception {
                       System.out.println(aLong);
                   }
               });

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
       执行结果:
       	0
        1
        2
        3
        4
        5
        6
        7
        8

3.3 timer

创建一个Observable,它在一个给定的延迟后发射一个特殊的值。

timer操作符创建一个在给定的时间段之后返回一个特殊值的Observable。

timer返回一个Observable,它在延迟一段给定的时间后发射一个简单地数字0.

time操作符默认在computation调度器上执行。

      Observable.timer(2,TimeUnit.SECONDS)
              .subscribe(new Consumer<Long>() {
                  @Override
                  public void accept(Long aLong) throws Exception {
                      System.out.println(aLong);
                      System.out.println("hello world");
                  }
              });

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        执行结果:
       		0
			hello world

相关文章: