【问题标题】:Wait for the first thread to finish before running other threads in parallel在并行运行其他线程之前等待第一个线程完成
【发布时间】:2016-03-04 07:28:08
【问题描述】:

在我的主要流程中,我想并行执行一些任务。假设我有 4 个任务要在单独的流程中执行,但是在这 4 个任务中,需要先完成第一个任务,然后才能并行运行其他任务。

我认为我的代码可以按照我的要求运行,但有没有更好的方法?

public static void main(String[] args) {
        System.out.println("In main");

        ExecutorService executor = Executors.newSingleThreadExecutor();

        executor.submit(() -> {
            runParalleltasks();
        });

        executor.shutdown();
        System.out.println("Exiting main");
    }

    private static void runParalleltasks() {
        System.out.println("Running parallel tasks");
        doTask1();
        ExecutorService executor = Executors.newFixedThreadPool(3);
        executor.submit(() -> {
            doTask2();
        });
        executor.submit(() -> {
            doTask3();
                });
        executor.submit(() -> {
            doTask4();
        });

        executor.shutdown();
        System.out.println("Exiting parallel tasks");
    }

【问题讨论】:

  • 为什么要有单线程执行器?
  • @matt 因为我想在与 main 不同的流程中执行我的 4 个任务,但是在这 4 个任务中,需要先完成第一个任务,然后才能并行运行其他任务。
  • 我永远无法理解为什么人们在想要顺序执行时使用线程。 显然您应该内联运行第一个任务,然后生成其他三个任务。
  • @EJP 我需要在与 main 方法不同的流程中执行我的 4 个任务,并且 4 个任务中的最后 3 个任务取决于第一个任务的输出。
  • 有谁知道在哪里可以用 Callables 而不是 Runnables 找到这个问题的答案?

标签: java multithreading parallel-processing java-8


【解决方案1】:

您可能想看看新的CompletableFuture 设施。对于像您这样的琐碎案例,它可能无法提供很多帮助,但它提供了很大的灵活性:

import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.runAsync;

runAsync(() -> doTask1(), executor)
        .thenCompose(v -> allOf(
                runAsync(() -> doTask2(), executor),
                runAsync(() -> doTask3(), executor),
                runAsync(() -> doTask4(), executor)
        ));

如果您需要将 task1 的输出传递给相关任务,那么这种方法真正让您眼前一亮的地方。假设doTask1 返回一个String 并且doTask2 接受一个String。现在我们应该使用supplyAsync,而不是runAsync 来处理第一个任务:

supplyAsync(() -> doTask1(), executor)
        .thenCompose(resultOf1 -> allOf(
                runAsync(() -> doTask2(resultOf1), executor),
                runAsync(() -> doTask3(), executor),
                runAsync(() -> doTask4(), executor)
        ));

【讨论】:

  • 但是这种方法会保留我的主要方法直到完成。我需要退出 main 而不等待这 4 个任务完成。
  • 在这种情况下,您的 main 方法应该返回 CompletableFuture<Void> 而不是 joining。我编辑了答案以反映这一点。
【解决方案2】:

您也可以使用ExecutorService 来执行此操作,就像您原来的方法一样。您所要做的就是使用 submit 的结果,它允许后续任务等待其完成:

ExecutorService executor = Executors.newFixedThreadPool(4);
Future<?> task1 = executor.submit(() -> doTask1());
Stream.<Runnable>of(() -> doTask2(), () -> doTask3(), () -> doTask4())
      .forEach(r -> executor.submit(() -> { try {
              task1.get();
              r.run();
          } catch(InterruptedException|ExecutionException ex){}
      }));
executor.shutdown();

我们必须在等待第一个任务完成时捕获异常,但这也为第一个任务失败或被取消时故意跳过下一个任务提供了机会。由于正常情况下,任务之间存在依赖关系,因为后续任务需要第一个任务的result,所以通过get()等待是很自然的方式。只要doTask1() 返回一个值,上面的submit 调用将使用submit(Callable) 而不是submit(Runnable),并且返回的Future 将具有反映方法返回类型的泛型类型,因此结果可以是通过其get() 方法检索。

为了避免死锁,必须满足以下条件之一:

  • 所有提交的任务都有足够的线程,或者
  • 队列的检索顺序与提交顺序相同

在示例中,甚至两者都适用。

请注意,上面的流使用不是必需的,经典循环也可以做到这一点:

for(Runnable r: Arrays.<Runnable>asList(() -> doTask2(), () -> doTask3(), () -> doTask4()))
    executor.submit(() -> { try {
            task1.get();
            r.run();
        } catch(InterruptedException|ExecutionException ex){}
    });

【讨论】:

    猜你喜欢
    • 2020-09-21
    • 1970-01-01
    • 2012-04-09
    • 1970-01-01
    • 1970-01-01
    • 2021-06-21
    • 1970-01-01
    • 1970-01-01
    • 2011-07-27
    相关资源
    最近更新 更多