参考资料 https://mcxiaoke.gitbooks.io/rxdocs/content/
1.buffer
根据缓冲容量大小发送新的观察者对象,接收的是缓冲区元素组成的集合体。
Observable.just("111", "222", "333", "444", "555")
.buffer(3)
.subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<String> strings) {
Log.d("buffer", "onNext: 接收的数据--->" + strings.size() + " " + strings);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
/**
* 输出结果
* 03-27 14:24:30.048 14508-14508/com.example.testlink D/buffer: onNext: 接收的数据--->3 [111, 222, 333]
* 03-27 14:24:30.048 14508-14508/com.example.testlink D/buffer: onNext: 接收的数据--->2 [444, 555]
*/
如果上游发射对象是这样的:
int[] is = new int[]{1,2,3,4,5,6,7};
Observable.fromArray(is).buffer(3)
那么下游接收的数据是:
/** * 03-27 14:33:14.358 14596-14596/com.example.testlink D/buffer: onNext: 接收的数据--->1 [[[email protected]] */
结果最终的输出集合体大小是1,内容是一个对象地址值。因为上游只发射一个数据过来。
2.Map
可以将任意发射数据转换成任意数据类型或对象,灵活度高。
Observable.just("100").map(new Function<String, Integer >() {
@Override
public Integer apply(String s) throws Exception {
Log.d("map", "apply: " + s);
return 10086;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d("map", "onNext:接收的数据---> " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
/**
* 将上游发射的 String 类型 转换成 integer 数据类型后发射给下游接收。
* 03-27 15:36:07.138 15066-15066/com.example.testlink D/apply: apply: 100
* 03-27 15:36:07.138 15066-15066/com.example.testlink D/map: onNext:接收的数据---> 10086
*/
3.flatMap
同样是类型转换,与Map相比的区别就是:Map转换后发射的数据类型可以是任意,但是flatMap转换后反射出去的是一个新的观察者对象:ObservableSource。
List<String> list2 = new ArrayList<>();
list2.add("资源a");
list2.add("资源b");
list2.add("资源c");
list2.add("资源d");
user user2 = new user("yy", "20", list2);
Observable.just(user2).flatMap(new Function<user, ObservableSource<user>>() {
@Override
public ObservableSource<user> apply(user s) throws Exception {
Log.d("flatMap", "apply: " + s.age);
return Observable.just(new user(s.age));
}
}).subscribe(new Observer<user>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(user o) {
Log.d("flatMap", "onNext:接收的数据---> " + o.name);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
/**
* 03-27 15:58:52.458 15434-15434/? D/flatMap: apply: 20
* 03-27 15:58:52.458 15434-15434/? D/flatMap: onNext:接收的数据---> pp
*/
简单user类:
public class user {
public String name;
public String age;
public List<String> nets;
public user(String name, String age, List<String> nets) {
this.age = age;
this.name = name;
this.nets = nets;
}
public user(String age) {
this.age = age;
this.name = "pp";
this.nets = null;
}
}
上面逻辑是将一个user对象同过flatMap拿到发射user对象的age,赋值给一个新的user对象,最后打印新的user的name。当然也可以转换成另外的ObservableSource类型数据。
4.GroupBy
直接字面上的意思理解就是分组。根据某种条件将发射出去的数据进行排序。
Observable.just(0, 1, 2, 3, 4, 5, 6)
.groupBy(new Function<Integer, Boolean>() {
@Override
public Boolean apply(Integer integer) throws Exception {
return integer % 2 == 0;
}
}).subscribe(new Observer<GroupedObservable<Boolean, Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(GroupedObservable<Boolean, Integer> object) {
// Log.d("groupBy", "accept1:接收的数据类型---> " + object.getKey());
object.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("groupBy", "accept2:接收的数据类型---> "+object.getKey() +" --> "+ integer);
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
/**
* com.example.testlink D/groupBy: accept2:接收的数据类型---> true --> 0
* com.example.testlink D/groupBy: accept2:接收的数据类型---> false --> 1
* com.example.testlink D/groupBy: accept2:接收的数据类型---> true --> 2
* com.example.testlink D/groupBy: accept2:接收的数据类型---> false --> 3
* com.example.testlink D/groupBy: accept2:接收的数据类型---> true --> 4
* com.example.testlink D/groupBy: accept2:接收的数据类型---> false --> 5
* com.example.testlink D/groupBy: accept2:接收的数据类型---> true --> 6
*/
5.scan
看官方解释:
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source * ObservableSource, then feeds the result of that function along with the second item emitted by the source * ObservableSource into the same function, and so on until all items have been emitted by the source ObservableSource, * emitting the result of each of these iterations. * <p>
这个意识大概说返回一个观察对象,这个对象通过某种方法先应用在第一个发射数据,然后将这个结果应用在发射的第二项数据上,直到最后一个发射数据。假定给他定义的方法是加法,每次发射的数据源是int 值,那么最终的结果就是全部发射数据的和了。
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d("scan", "integer: " + integer + " integer2= " + integer2);
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d("scan", "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
/**
* /com.example.testlink D/scan: onNext: 1
/com.example.testlink D/scan: integer: 1 integer2= 2
/com.example.testlink D/scan: onNext: 3
/com.example.testlink D/scan: integer: 3 integer2= 3
/com.example.testlink D/scan: onNext: 6
/com.example.testlink D/scan: integer: 6 integer2= 4
/com.example.testlink D/scan: onNext: 10
/com.example.testlink D/scan: integer: 10 integer2= 5
/com.example.testlink D/scan: onNext: 15
*/
这里第一次打印没有走加法运算,而是直接到next里面去了,第二次才开始走加法运算,第一个参数是这个方法产生的结果,第二个参数是发射的数据源。
6.window
Observable.just(1, 2, 3, 4, 5, 6, 7)
.window(5)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Observable<Integer> ob) {
ob.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("window", "accept: 接受的数据----> " + integer);
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
/**
* 03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 1
03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 2
03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 3
03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 4
03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 5
03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 6
03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 7
*/
emmm,看到这个日志是懵的,没看出什么特别的地方。查阅一下文档后从下面这个图来理解的话,window作用是根据count的数量将发射的数据源分组。
上面就是一些常用到的转换的操作符,列举的例子都是很简单的举例。当然这并不是所有的转换操作符都在这儿,没有列举的,碰到的在以后慢慢探索。
每天进步一点点!U_QAQ_U