rxjava操作符之条件布尔操作符:
1.条件布尔操作符的作用:通过设置函数,判断被观察者Observable发送的事件是否符合条件
2.条件布尔操作符的常见类型:
1.all()操作符
//all()操作符
//all操作符的作用是判断被观察者Observable发送的所有事件是否都满足指定的条件通过一个布尔值来返回给观察者Observer来进行判断
Observable.just(1,2,3,4,5,6,7)
.all(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer)
throws Exception
{
return integer > 10;
// return integer % 2 != 0;
}
})
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean aBoolean)
throws Exception
{
Log.d(TAG, "all操作符筛选结果: " + aBoolean);
}
});
打印结果:
2.takeWhile()
//takeWhile()操作符
//takeWhile()操作符判断被观察者Observable发送的每项数据是否满足指定的条件
//如果满足就发送该事件,否则就不发送该事件 同时也不会发送该事件之后的所有的事件
Observable.just(1,2,4,5,6,24,2)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer aLong)
throws Exception
{
return aLong < 4;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer aLong)
throws Exception
{
Log.d(TAG, "满足条件的选项 : " + aLong);
}
});
打印结果:
3.skipWhile()
//skipWhile操作符
//skipWhile()操作符会判断被观察者发送的每项数据如果判断的结果为false则开始发送
//从该事件起的事件序列里面的所有的事件并且不会再判断该事件之后的事件的数据是否满足条件
//注意和takeWhile的区别:takewhile判断到某一个事件如果不满足条件返回为false不仅不发送该事件同时
//也不会发送该事件以后的所有的事件,而skipWhile则是会判断某一个事件满足的条件是否返回为false
//如果返回为false不仅发送该事件同时也不会判断该事件之后的所有事件而是直接发送该事件之间之后的所有的事件
Observable.just(11,11,2,12,3,13,5,9,15)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer aLong)
throws Exception
{
return aLong < 12;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer aLong)
throws Exception
{
Log.d(TAG, "skipWhile : " + aLong);
}
});
打印结果:
4.takeUntil()
//条件布尔操作符takeUntil()
//当判断执行到事件序列中的某个事件满足条件返回true的时候就停止发送Observable事件序列中的事件
Observable.just(1,2,10,3,13,4,15,5,18)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
//当数值大于10的时候返回true停止发送事件
return integer > 10;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: 开始采用subscribe连接 thread = " + Thread.currentThread().getName());
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer + " thread = " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage() + " thread = " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: thread = " + Thread.currentThread().getName());
}
});
Log.d(TAG, "-----------------------------------------------------");
//takeUntil操作符不仅可以使用predicate返回一个布尔值来决定是否发送Observable事件序列里面的事件
//同时也可以传入一个Observable对象 即当我们传入takeUntil里面的Observable开始发送数据的时候
//原来的Observable就停止发送是事件序列里面的事件
Observable.intervalRange(1,10,0,1,TimeUnit.SECONDS) //每隔1秒发送从1开始递增的数字
.takeUntil(Observable.timer(5,TimeUnit.SECONDS)) //延迟5秒后发送一个数值0
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: 开始采用subscribe连接 thread = " + Thread.currentThread().getName());
}
@Override
public void onNext(Long integer) {
Log.d(TAG, "onNext: " + integer + " thread = " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage() + " thread = " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: thread = " + Thread.currentThread().getName());
}
});
打印结果:
5. skipUntil()
//条件布尔操作符skipUntil()
//skipUntil()操作符的特点是当skipUntil()操作符传入的Observable开始发送数据的时候
//skipUntil()它才开始发射原始Observable的数据(无论skipUntil的Observable有没有发射数据其实原始的
//Observable都已经开始在发射数据了只是skipUntil给丢弃了)
//SkipUntil订阅原始的Observable,但是忽略原始Observable的发射物,
// 直到传入skipUntil()的Observable发射了一项数据那一刻,它开始发射原始Observable的数据
Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
.skipUntil(Observable.timer(3, TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,
"onSubscribe: 开始采用subscribe连接 thread = " + Thread.currentThread().getName());
}
@Override
public void onNext(Long integer) {
Log.d(TAG,
"onNext: " + integer + " thread = " + Thread.currentThread()
.getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG,
"onError: " + e.getMessage() + " thread = " + Thread.currentThread()
.getName());
}
@Override
public void onComplete() {
Log.d(TAG,
"onComplete: thread = " + Thread.currentThread()
.getName());
}
});
打印结果:
6.sequenceEqual()
//条件布尔操作符sequenceEqual()
//sequenceEqual()操作符的作用是判断两个发送数据的Observable对象
//发送的数据是否相同,如果相同返回true如果不相同则返回false
Observable.sequenceEqual(
Observable.just(1,2,3,4),
// Observable.just(1,2,3),
Observable.just(1,2,3,5)
).subscribe(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean aBoolean) throws Exception {
Log.d(TAG, "sequenceEqual : " + aBoolean);
}
});
Log.d(TAG, "-----------------------------------------------------------------------");
Observable.sequenceEqual(
Observable.just(1, 2, 3, 4),
Observable.just(1, 2, 3, 5),
//我们也可以自定义BiPredecate来自定义我们的两个Observable数据的比较规则
//从而返回我们自己想要的比较结果的布尔值
new BiPredicate<Integer, Integer>() {
@Override
public boolean test(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer == integer2 || integer == integer2 + 1 || integer == integer2 - 1;
}
}
).subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean aBoolean) throws Exception {
Log.d(TAG, "sequenceEqual : " + aBoolean);
}
});
打印结果:
7. contains()
//条件布尔操作符contains()
//contains()操作符判断发送的数据中是否包含指定的数据
Observable.just(1,2,3,4,5)
.contains(4)
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean aBoolean) throws Exception {
Log.d(TAG, "Observable事件序列中是否包含4 : "+ aBoolean);
}
});
打印结果:
8:isEmpty()
//条件布尔操作符isEmpty()
//判断发送的数据是否为空
// Observable.just(1,2,3)
Observable.fromArray(new int[]{}) //空数组 返回false
// Observable.fromIterable(new ArrayList<Integer>()) //空集合返回true
.isEmpty()
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean aBoolean) throws Exception {
Log.d(TAG, "发送的数据是否为空: "+ aBoolean);
}
});
打印结果:
9,amb()/ambArray()
//条件布尔操作符amb()
//amb()/ambArray()操作符的作用是当我们需要发送多个Observable数据时
//使用amb()/ambArray()操作符只发送先发送数据的Observable而会丢弃掉其余的
// Observable的数据发送
ArrayList<ObservableSource<Integer>> ambList = new ArrayList<>();
ambList.add(Observable.just(1,2,3).delay(2,TimeUnit.SECONDS));
ambList.add(Observable.just(4,5,6).delay(1,TimeUnit.SECONDS));
Observable.amb(ambList).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "amb : "+ integer);
}
});
Log.d(TAG, "-----------------------------------------------------");
Observable.ambArray(Observable.just(7,8,9).delay(2,TimeUnit.SECONDS)
,Observable.just(10,11,12,13).delay(3,TimeUnit.SECONDS))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "ambArray : " + integer);
}
});
打印结果:
10.defaultIfEmpty()
//条件布尔操作符defaultIfEmpty()
//在不发送任何有效事件(next事件)仅发送complete事件(不包含error事件)的情况下
//使用defaultIfEmpty()的情形下会发送一个默认值回调给观察者Observer的onNext进行调用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// e.onNext(1);
// e.onNext(2);
// e.onNext(3);
e.onComplete();
// e.onError(new Exception("发生了异常!"));
}
}).defaultIfEmpty(100)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: 开始采用subscribe连接 thread = " + Thread.currentThread().getName());
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer + " thread = " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage() + " thread = " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: thread = " + Thread.currentThread().getName());
}
});
打印结果: