【发布时间】:2021-04-22 17:35:45
【问题描述】:
如何将两者结合起来,使其并行或串行运行?
Observable<Data>) dataRequestOne;
Observable<Data>) dataRequestTwo;
【问题讨论】:
标签: android optimization rx-java observable
如何将两者结合起来,使其并行或串行运行?
Observable<Data>) dataRequestOne;
Observable<Data>) dataRequestTwo;
【问题讨论】:
标签: android optimization rx-java observable
平行:
Observable.merge(dataRequestOne, dataRequestTwo)
序列号:
Observable.concat(dataRequestOne, dataRequestTwo);
【讨论】:
merge 仅在可观察对象位于不同线程上时才能并行工作。
试试 zip 运算符:http://reactivex.io/documentation/operators/zip.html
Observable.zip(dataRequestOne, dataRequestTwo, new Func2<Data, Data, Object>() {
@Override
public Object call(Data o, Data o2) {
return null;
}
});
【讨论】:
如果您想在 onNext 中对每个请求使用串行响应,则需要使用合并运算符
http://reactivex.io/documentation/operators/merge.html
Observable.mergeDelayError(
dataRequestOne.subscribeOn(Schedulers.newThread()),
dataRequestTwo.subscribeOn(Schedulers.newThread())
)
【讨论】:
要实现这一点,您可以这样做:
Observable.just(dataRequestOne, dataRequestTwo).flatMap(new Func1<Data, Observable<Data>>() {
@Override
public Observable<Data> call(Data data) {
return Observable.just(data)
.subscribeOn(Schedulers.io);
}
});
现在每个请求都将在单独的线程上处理。 FlatMap 还有一个可选的 maxConcurrent 参数,所以如果你有更多的 observable,你可以在必要时控制处理请求的线程数量。
【讨论】:
使用merge一次并行运行2个observables的示例
val parallelObservables = mutableListOf<Single<String>>()
val executor = Executors.newFixedThreadPool(2)
val schedulers = Schedulers.from(executor)
for (i in 0..4) {
val observable = Single.create<String> {
Thread.sleep(2000)
it.onSuccess("" + i + " " + Thread.currentThread().name)
}.subscribeOn(schedulers) // need to subscribe here
parallelObservables.add(observable)
}
Log.i("TAG", "start")
Single.merge(parallelObservables)
.subscribe {
Log.i("TAG", "complete $it")
}
}
Output
2021-04-22 16:31:51.790 32016-32016/com.example.androidrxtest I/TAG: start
2021-04-22 16:31:53.891 32016-32055/com.example.androidrxtest I/TAG: complete 0 pool-1-thread-1
2021-04-22 16:31:53.894 32016-32056/com.example.androidrxtest I/TAG: complete 1 pool-1-thread-2
2021-04-22 16:31:55.894 32016-32055/com.example.androidrxtest I/TAG: complete 2 pool-1-thread-1
2021-04-22 16:31:55.897 32016-32056/com.example.androidrxtest I/TAG: complete 3 pool-1-thread-2
2021-04-22 16:31:57.896 32016-32055/com.example.androidrxtest I/TAG: complete 4 pool-1-thread-1
【讨论】: