处理这个问题的最优雅的方法是使用Observable 的share() 运算符。在 huuuuge 的简化中,它允许您将 observable 拆分为多个。因此,在您的情况下,表示数字流的可观察对象可以分为两个可观察对象。一个用于奇数,一个用于偶数。
假设 allNumbers(在您的示例中为 finalObservable)代表 数字流:
final Observable<Integer> allNumbers =
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.share();
Observable<Integer> oddNumbers = allNumbers.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 != 0;
}
});
Observable<Integer> evenNumbers = allNumbers.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
});
final Action1<Integer> printingAction = new Action1<Integer>() {
@Override
public void call(Integer t) {
System.out.println(Thread.currentThread() + " " + t);
}
};
evenNumbers.subscribeOn(Schedulers.computation()).subscribe(printingAction);
oddNumbers.subscribeOn(AndroidSchedulers.mainThread()).subscribe(printingAction);
并使用 Retrolambda 进行简化:
final Observable<Integer> allNumbers =
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.share();
Observable<Integer> oddNumbers = allNumbers.filter(integer -> integer % 2 != 0);
Observable<Integer> evenNumbers = allNumbers.filter(integer -> integer % 2 == 0);
final Action1<Integer> printingAction =
t -> System.out.println(Thread.currentThread() + " " + t);
evenNumbers.subscribeOn(Schedulers.computation()).subscribe(printingAction);
oddNumbers.subscribeOn(AndroidSchedulers.mainThread()).subscribe(printingAction);
您没有确切指定需要在哪个线程中处理什么,因此您可能需要更正 subscribeOn 参数并可能添加 observeOn 运算符。根据您的需要。