简书地址 http://www.jianshu.com/p/2badfbb3a33b
| 描述 | RxJava 1.X | RxJava 2.X |
|---|---|---|
package包名 |
rx.xxx |
io.reactivex.xxx |
| Reactive Streams规范 |
1.X早于Reactive Streams规范出现,仅部分支持规范 |
完全支持 |
| Backpressure 背压 | 对背压的支持不完善 |
Observable设计为不支持背压新增 Flowable支持背压 |
null空值 |
支持 | 不再支持null值,传入null值会抛出 NullPointerException
|
Schedulers线程调度器 |
Schedulers.immediate()Schedulers.trampoline()Schedulers.computation()Schedulers.newThread()Schedulers.io()Schedulers.from(executor)AndroidSchedulers.mainThread()
|
移除Schedulers.immediate()新增 Schedulers.single()其它未变 |
Single |
行为类似Observable,但只会发射一个onSuccess或onError
|
按照Reactive Streams规范重新设计,遵循协议onSubscribe(onSuccess/onError)
|
Completable |
行为类似Observable,要么全部成功,要么就失败 |
按照Reactive Streams规范重新设计,遵循协议onSubscribe (onComplete/onError)
|
Maybe |
无 |
2.X新增,行为类似Observable,可能会有一个数据或一个错误,也可能什么都没有。可以将其视为一种返回可空值的方法。这种方法如果不抛出异常的话,将总是会返回一些东西,但是返回值可能为空,也可能不为空。按照Reactive Streams规范设计,遵循协议onSubscribe (onSuccess/onError/onComplete)
|
Flowable |
无 |
2.X新增,行为类似Observable,按照Reactive Streams规范设计,支持背压Backpressure
|
Subject |
AsyncSubjectBehaviorSubjectPublishSubjectReplaySubjectUnicastSubject
|
2.X依然维护这些Subject现有的功能,并新增:AsyncProcessorBehaviorProcessorPublishProcessorReplayProcessorUnicastProcessor支持背压 Backpressure
|
Subscriber |
Subscriber |
由于与Reactive Streams的命名冲突,Subscriber已重命名为Disposable
|
RxJava 2.X + Retrofit + OkHttp 简单示例点这里
library依赖变化
//1.X compile 'io.reactivex:rxjava:1.2.1' compile 'io.reactivex:rxandroid:1.2.1' //2.X compile 'io.reactivex.rxjava2:rxjava:2.0.0' compile 'io.reactivex.rxjava2:rxandroid:2.0.0'
package变化
变动主要为rx.xxx --> io.reactivex.xxx
//1.X import rx.Observable; import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; import rx.schedulers.Schedulers; import rx.functions.Action1; //2.X import io.reactivex.Observable; import io.reactivex.ObservableSource; import io.reactivex.ObservableTransformer; import io.reactivex.disposables.Disposable; import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.schedulers.Schedulers; import io.reactivex.functions.Consumer;
null值
RxJava 2.X不再支持null值,如果传入一个null会抛出NullPointerException
Observable.just(null); Single.just(null); Observable.fromCallable(() -> null) .subscribe(System.out::println, Throwable::printStackTrace); Observable.just(1).map(v -> null) .subscribe(System.out::println, Throwable::printStackTrace);
案例1
//1.X public static final Observable.Transformer IO_TRANSFORMER = new Observable.Transformer() { @Override public Object call(Object observable) { return ((Observable) observable).subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(Schedulers.io()); } }; public static final <T> Observable.Transformer<T, T> applySchedulers(Observable.Transformer transformer){ return (Observable.Transformer<T, T>)transformer; } Action1<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Subscription subscription = Observable.from(items) .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER)) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return Integer.valueOf(s); } }) .subscribe(onNext); //TODO subscription.unsubscribe(); //2.X public static final ObservableTransformer IO_TRANSFORMER = new ObservableTransformer() { @Override public ObservableSource apply(Observable upstream) { return upstream.subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(Schedulers.io()); } }; public static final <T> ObservableTransformer<T, T> applySchedulers(ObservableTransformer transformer){ return (ObservableTransformer<T, T>)transformer; } Consumer<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Disposable disposable = Observable.fromArray(items) .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER)) .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.valueOf(s); } }) .subscribe(onNext); //TODO disposable.dispose();
-
.subscribe(...)返回值的变化:1.X为Subscription, 2.X为Disposable -
Transformer的变化:1.X为rx.Observable内部的Transformer接口, 继承自Func1<Observable<T>, Observable<R>>, 2.X为io.reactivexObservableTransformer<Upstream, Downstream>,是一个独立的接口 -
AndroidSchedulers的变化: 1.X为rx.android.schedulers.AndroidSchedulers, 2.X为io.reactivex.android.schedulers.AndroidSchedulers -
Func1的变化: 1.X为rx.functions.Func1, 2.X为io.reactivex.functions.Function - 其它重载方法见下方截图
//1.X public class AppBaseActivity extends AppCompatActivity { ... private CompositeSubscription mCompositeSubscription; protected void addSubscription(Subscription subscription) { if (null == mCompositeSubscription) { mCompositeSubscription = new CompositeSubscription(); } mCompositeSubscription.add(subscription); } @Override protected void onDestroy() { if (null != mCompositeSubscription) { mCompositeSubscription.unsubscribe(); } super.onDestroy(); } ... } //2.X public class AppBaseActivity extends AppCompatActivity { ... private CompositeDisposable mCompositeDisposable; protected void addDisposable(Disposable disposable) { if (null == mCompositeDisposable) { mCompositeDisposable = new CompositeDisposable(); } mCompositeDisposable.add(disposable); } @Override protected void onDestroy() { if (null != mCompositeDisposable) { mCompositeDisposable.clear(); } super.onDestroy(); } ... }