【发布时间】:2019-01-28 16:20:25
【问题描述】:
好吧,这是我在 StackOverflow 中的第一个问题,但经过几天与RxJava 的斗争,我无法找到其他解决方案,我自己尝试了很多东西,挖掘文档和其他帖子,但我不是确定我需要做什么。我尝试了flatMap、zip、merge 等的几种组合,但总是走到死胡同,最接近的解决方案是下面的代码。我将不胜感激任何帮助或指导。
我需要一个方法,给定一个输入列表,使用列表的不同输入执行并行调用,并且在所有并行调用完成之前不继续执行。还需要保留不同执行的结果以供以后使用(EDIT:在开始执行的同一线程中)。
public void parallelExecution(List<Integer> calls) {
List<String> results = new ArrayList<>();
logger.debug("Starting parallel executions");
Observable.fromIterable(calls)
.flatMap(val -> Observable.just(val).observeOn(Schedulers.io())
.doOnNext(item -> results.add(simpleAsync(item).toString())))
.subscribe(call -> logger.debug("Execution: {}", Thread.currentThread().getName()));
logger.debug("Ending parallel executions");
for (String x : results) {
logger.debug("Results: {}", x);
}
}
private Integer simpleAsync(Integer number) {
Integer result = number * number;
logger.info("Pre log {}: {}", Thread.currentThread().getName(), result);
try {
Thread.sleep(number * 500);
} catch (Exception e) {
}
logger.info("Post log {}: {}", Thread.currentThread().getName(), result);
return result;
}
问题是这段代码没有“等待”“simpleAsync”方法的执行,它在没有“结果”日志的情况下完成了执行(还没有结果),之后,“发布日志” " 跟踪出现在不同的线程中执行:
Starting parallel executions
Ending parallel executions
Pre log RxCachedThreadScheduler-1: 1
Pre log RxCachedThreadScheduler-2: 4
Pre log RxCachedThreadScheduler-3: 9
Pre log RxCachedThreadScheduler-4: 16
Pre log RxCachedThreadScheduler-5: 25
Post log RxCachedThreadScheduler-1: 1
Execution: RxCachedThreadScheduler-1
Post log RxCachedThreadScheduler-2: 4
Execution: RxCachedThreadScheduler-2
Post log RxCachedThreadScheduler-3: 9
Execution: RxCachedThreadScheduler-3
Post log RxCachedThreadScheduler-4: 16
Execution: RxCachedThreadScheduler-4
Post log RxCachedThreadScheduler-5: 25
Execution: RxCachedThreadScheduler-5
如果我删除“observeOn”语句,该方法会等待调用完成,但它们是按顺序完成的(在同一个线程中):
Starting parallel executions
Pre log Default Executor-thread-9: 1
Post log Default Executor-thread-9: 1
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 4
Post log Default Executor-thread-9: 4
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 9
Post log Default Executor-thread-9: 9
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 16
Post log Default Executor-thread-9: 16
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 25
Post log Default Executor-thread-9: 25
Execution: Default Executor-thread-9
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
Results: 25
【问题讨论】:
标签: java multithreading concurrency rx-java