【问题标题】:Execute parallel tasks with RxJava and wait until the last one of them is completed使用 RxJava 执行并行任务并等待最后一个任务完成
【发布时间】:2019-01-28 16:20:25
【问题描述】:

好吧,这是我在 StackOverflow 中的第一个问题,但经过几天与RxJava 的斗争,我无法找到其他解决方案,我自己尝试了很多东西,挖掘文档和其他帖子,但我不是确定我需要做什么。我尝试了flatMapzipmerge 等的几种组合,但总是走到死胡同,最接近的解决方案是下面的代码。我将不胜感激任何帮助或指导。

我需要一个方法,给定一个输入列表,使用列表的不同输入执行并行调用,并且在所有并行调用完成之前不继续执行。还需要保留不同执行的结果以供以后使用(EDIT:在开始执行的同一线程中)。

public void parallelExecution(List<Integer> calls) {
  List<String> results = new ArrayList<>();
  logger.debug("Starting parallel executions");
  Observable.fromIterable(calls)
      .flatMap(val -> Observable.just(val).observeOn(Schedulers.io())
      .doOnNext(item -> results.add(simpleAsync(item).toString())))
      .subscribe(call -> logger.debug("Execution: {}", Thread.currentThread().getName()));
  logger.debug("Ending parallel executions");
  for (String x : results) {
    logger.debug("Results: {}", x);
  }
}

private Integer simpleAsync(Integer number) {
  Integer result = number * number;
  logger.info("Pre log {}: {}", Thread.currentThread().getName(), result);
  try {
    Thread.sleep(number * 500);
  } catch (Exception e) {
  }
  logger.info("Post log {}: {}", Thread.currentThread().getName(), result);
  return result;
}

问题是这段代码没有“等待”“simpleAsync”方法的执行,它在没有“结果”日志的情况下完成了执行(还没有结果),之后,“发布日志” " 跟踪出现在不同的线程中执行:

Starting parallel executions
Ending parallel executions
Pre log RxCachedThreadScheduler-1: 1
Pre log RxCachedThreadScheduler-2: 4
Pre log RxCachedThreadScheduler-3: 9
Pre log RxCachedThreadScheduler-4: 16
Pre log RxCachedThreadScheduler-5: 25
Post log RxCachedThreadScheduler-1: 1
Execution: RxCachedThreadScheduler-1
Post log RxCachedThreadScheduler-2: 4
Execution: RxCachedThreadScheduler-2
Post log RxCachedThreadScheduler-3: 9
Execution: RxCachedThreadScheduler-3
Post log RxCachedThreadScheduler-4: 16
Execution: RxCachedThreadScheduler-4
Post log RxCachedThreadScheduler-5: 25
Execution: RxCachedThreadScheduler-5

如果我删除“observeOn”语句,该方法会等待调用完成,但它们是按顺序完成的(在同一个线程中):

Starting parallel executions
Pre log Default Executor-thread-9: 1
Post log Default Executor-thread-9: 1
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 4
Post log Default Executor-thread-9: 4
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 9
Post log Default Executor-thread-9: 9
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 16
Post log Default Executor-thread-9: 16
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 25
Post log Default Executor-thread-9: 25
Execution: Default Executor-thread-9
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
Results: 25

【问题讨论】:

    标签: java multithreading concurrency rx-java


    【解决方案1】:

    您是否尝试过使用zip

    public void parallelExecution(List<Integer> calls) {
    
        logger.debug("Starting parallel executions");
    
        // Create an iterable observables
        List<Observable<Integer>> observables = calls.stream()
                .map(i -> {
                    return Observable.fromCallable(() -> simpleAsync(i))
                            .subscribeOn(Schedulers.newThread());
                })
                .collect(Collectors.toList());
    
    
        Observable.zip(observables, objects -> { // Zip observables
                    return Arrays.stream(objects)
                            .map(Object::toString)
                            .collect(Collectors.toList());
                })
                .doOnNext(results -> logger.debug("Ending parallel executions"))
                .subscribe(results -> { // Subscribe to the result.
                    // Put your code that needs to "wait"
                    for (String x : results) {
                        logger.debug("Results: {}", x);
                    }
                });
    }
    

    结果将如下所示:

    Starting parallel executions
    Pre log RxNewThreadScheduler-3: 9
    Pre log RxNewThreadScheduler-1: 1
    Pre log RxNewThreadScheduler-2: 4
    Pre log RxNewThreadScheduler-4: 16
    Pre log RxNewThreadScheduler-5: 25
    Post log RxNewThreadScheduler-1: 1
    Post log RxNewThreadScheduler-2: 4
    Post log RxNewThreadScheduler-3: 9
    Post log RxNewThreadScheduler-4: 16
    Post log RxNewThreadScheduler-5: 25
    Ending parallel executions
    Results: 1
    Results: 4
    Results: 9
    Results: 16
    Results: 25
    

    编辑: 您可以使用observeOn 更改要收听结果的线程。例如,如果您想从调用线程订阅,您可以将代码更改为这样的内容(请参阅these SO 答案):

    final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
    logger.debug("Starting parallel executions");
    
    // Create an iterable observables
    List<Observable<Integer>> observables = calls.stream()
            .map(i -> {
                return Observable.fromCallable(() -> simpleAsync(i))
                        .subscribeOn(Schedulers.newThread());
            })
            .collect(Collectors.toList());
    
    
    Observable.zip(observables, objects -> { // Zip observables
                return Arrays.stream(objects)
                        .map(Object::toString)
                        .collect(Collectors.toList());
            })
            .doOnNext(results -> logger.debug("Ending parallel executions"))
            .observeOn(Schedulers.from(tasks::add)) // Add a scheduler with executor from the current thread
            .subscribe(results -> { // Subscribe to the result.
                // Put your code that needs to "wait"
                for (String x : results) {
                    logger.debug("Results: {}", x);
                }
            });
    
    try {
        tasks.take().run();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    

    【讨论】:

    • 您好 Sanlok Lee,首先感谢您的回答。我已经尝试过了,但这种行为并不完全是我所需要的。关键是结果的最终处理应该在初始执行的同一个线程中完成(打印“开始并行执行”的线程)
    • @RLS,您可以通过在.subscribe 调用之前添加observeOn 来实现该行为。您可以使用scheduler.from() 传入在默认线程上运行的调度程序。回家后我会用一个例子来更新答案。
    • Sanlok Lee,非常感谢您的帮助。我实现了相同的结果,使当前线程等待并调用一个方法,该方法通知当前线程在可观察对象完成后继续:.doOnComplete(() -&gt; threadNotifier(currentThread)),但我的选择更“脏”,需要更多的错误管理。
    【解决方案2】:

    我建议你的反应性思考不够:

    public Single<List<String>> parallelExecution(List<Integer> calls) {
      return Observable
          .fromIterable(calls)
          .flatMap(val -> Observable.fromCallable(() -> simpleAsync(val).toString())
                                    .subscribeOn(Schedulers.io())
          .toList();
    }
    
    • .toList() 将收集所有结果并在flatMap 完成时提供单个项目
    • 您想使用subscribeOn,而不是observeOn
    • 如果simpleAsync 返回一个响应式对象,这会更简单。
    • 如果您需要保持parallelExecution 无响应,请使用blockingGet 之类的内容。

    【讨论】:

    • 您好,塔索斯,感谢您的回答。我已经尝试过您的 cmets,但我对 RxJava 的工作并不熟悉。我已更改 simpleAsync 以返回带有结果的 Observable,所以现在 flatMap.flatMap(val -&gt; simpleAsync(val)).subscribeOn(Schedulers.io()).toList()。我还添加了blockingGet,所以现在我不使用任何subscibed 语句,但是5个“并行”执行是在同一个线程(不是主体)中执行的,所以它们不是并行执行的跨度>
    【解决方案3】:

    感谢您的问题和解决方案,基于这些,我正在编写一个更清晰的实用方法,以便在需要时调用:

     public static Disposable parallelExecution(Runnable afterExecutionRunnable, Runnable... calls) {       
        return Observable.fromArray(calls)
                .flatMap(callable -> Completable.fromRunnable(callable).subscribeOn(Schedulers.io()).toObservable())
                .toList().subscribeOn(Schedulers.io())
                .subscribe(voids -> {
                    if (afterExecutionRunnable != null) {
                        afterExecutionRunnable.run();
                    }
                });
    }
    

    调用示例:

        RxJavaUtils.parallelExecution(() -> {
                    //Here action when all tasks ends
                },               
                this::taskMethod1,
                this::taskMethod2,
              //sequential tasks 
                () -> {
                    taskMethod3();
                    taskMethod4ThatDependsOf3();
                },
                this::taskMethod5);
    

    您可以取消对实用方法返回的一次性调用 .dispose() 的任务的并行执行

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-10-21
      • 1970-01-01
      • 2012-04-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多