【问题标题】:Java 8 Running Multiple Methods In ParallelJava 8 并行运行多个方法
【发布时间】:2017-11-20 21:38:40
【问题描述】:

我有 2 个方法具有不同的返回类型,我想同时运行它们。这是我的代码:

public void method(int id) {
    final CompletableFuture<List<FooA>> fooACF = CompletableFuture.supplyAsync(() -> generateFooA(id));
    final CompletableFuture<List<FooB>> fooBCF = CompletableFuture.supplyAsync(() -> generateFooB(id));
    List<FooA> fooAs = fooACF.get();
    List<FooB> fooBs = fooBCF.get();
    //Do more processesing
}

public List<FooA> generateFooA(int id) {
    //code
}

public List<FooB> generateFooB(int id) {
    //code
}

但我不知道这两种方法是否会与上述代码并行运行,或者我是否最好说:

List<FooA> fooAs = generateFooA(id);
List<FooB> fooBs = generateFooB(id);

我如何正确使用可完成的期货才能并行运行这两种方法?

【问题讨论】:

  • 您的代码看起来不错。您有什么特别的顾虑吗?
  • 如果您询问您所做的是否会在 2 个单独的线程上运行这两种方法,那么是的,您正确使用了 CompletableFuture

标签: java concurrency java-8 java.util.concurrent completable-future


【解决方案1】:

您的代码工作正常,使用ForkJoinPool.commonPool() 提供的线程,正如CompletableFuture.supplyAsync(Supplier&lt;U&gt; supplier) 的JavaDoc 所承诺的那样。您可以通过添加一些sleep()println() 语句以快速而简单的方式证明它。我通过使用String 而不是List&lt;Foo&gt; 稍微简化了您的代码:

public void method(int id) throws InterruptedException, ExecutionException {
    CompletableFuture<String> cfa = CompletableFuture.supplyAsync(() -> generateA(id));
    CompletableFuture<String> cfb = CompletableFuture.supplyAsync(() -> generateB(id));
    String fooA = cfa.get();
    String fooB = cfb.get();
    System.out.println("Final fooA " + fooA);
    System.out.println("Final fooB " + fooB);
}

public String generateA(int id) {
    System.out.println("Entering generateA " + Thread.currentThread());
    sleep(2000);
    System.out.println("Leaving generateA");
    return "A" + id;
}

public String generateB(int id) {
    System.out.println("Entering generateB " + Thread.currentThread());
    sleep(1000);
    System.out.println("Leaving generateB");
    return "B" + id;
}

private void sleep(int n) {
    try {
        Thread.sleep(n);
    } catch (InterruptedException ex) {
        // never mind
    }
}

输出是:

Entering generateFooA Thread[ForkJoinPool.commonPool-worker-1,5,main]
Entering generateFooB Thread[ForkJoinPool.commonPool-worker-2,5,main]
Leaving generateFooB
Leaving generateFooA
Final fooA A1
Final fooB B1

您可以手动观察“离开”输出行在 1 秒和 2 秒后出现。如需更多证据,您可以在输出中添加时间戳。如果您更改睡眠的相对长度,您将看到“离开”输出以不同的顺序显示。


如果您省略了sleep()s,那么第一个线程很可能会很快完成,以至于在第二个线程开始之前就完成了:

Entering generateA Thread[ForkJoinPool.commonPool-worker-1,5,main]
Leaving generateA
Entering generateB Thread[ForkJoinPool.commonPool-worker-1,5,main]
Leaving generateB
Final fooA A1
Final fooB B1

请注意,这一切都发生得如此之快,以至于在运行时请求第二个线程时,线程已经返回到池中。所以原来的线程被重用了。

可以想象,这也可能发生在非常短的睡眠中,尽管在我的系统上,每次运行它时 1 毫秒的睡眠就足够了。当然,sleep() 是需要时间才能完成的“真实”操作的占位符。如果您的实际操作如此便宜以至于它在另一个线程启动之前完成,那么这很好地暗示了这是一个多线程无益的场景。


但是如果您需要询问如何证明事情是同时发生的,我想知道您为什么首先希望它们同时发生。如果您的程序在同时或按顺序执行这些任务时没有“现实世界”可观察到的差异,那么为什么不让它按顺序运行呢?更容易推理顺序操作;有很多与并发相关的偷偷摸摸的错误。

也许您希望通过多线程来提高速度——如果是这样的话,速度的提高应该是您所衡量的,而不是事情是否实际上是并发的。请记住,对于非常多的任务,CPU 并行执行它们的速度比顺序执行更快。

【讨论】:

    【解决方案2】:

    您缺少Executor

    ExecutorService executor = Executors.newCachedThreadPool();
    List<Future<?>> = Stream.<Runnable>of(() -> generateFooA(id), () -> generateFooA(id))
            .map(executor::submit)
            .collect(Collectors.toList());
    for (Future<?> future : futures) {
        future.get(); // do whatever you need here
    }
    

    Runnables 在您 submit 他们时开始执行。 get() 会尽快返回。例如,如果你get() 的第一个未来是最慢的,那么所有其他get() 调用将立即返回。

    【讨论】:

    • 回复,“您缺少Executor。” Executor 隐含在 OP 的示例中。该示例将任务提交到ForkJoinPool.commonPool()
    • @James 这不是一回事。 Executor 尽最大努力并行执行,并且此代码专用于它,因此并行执行是“可能的”。但是 ForkJoinPool.commonPool() 在所有流之间共享,因此并行执行的可能性较小(但并非不可能或当然)。
    • 这两种方法都“尽力而为”来并行执行,但是,即使您设法为每个任务创建一个不同的线程,您也永远无法保证并行执行。但即使这样也不能通过使用线程池执行器来保证。当第一个工作线程在提交第二个任务之前设法完成第一个任务时,它将接手第二个任务(那时并不重要)。如果 F/J 的 common pool 由于其他任务而没有空闲容量,则意味着工作负载占用了所有 cpu 核心,因此 有可能提交线程没有在两个任务之间运行……
    • @Holger 当然,但我说的是公共池中的线程忙于执行长时间运行的任务的情况(这将是一个糟糕的选择,但我已经看到它发生了)并且 JVM 中的所有流都必须等到它们完成。如果你有一个单独的 executor,虽然它的线程仍然要竞争调度,但它们不会排在其他任务后面。我知道这是一个极端情况,但问题要求并行执行。此外,如果 OP 的任务长时间运行,您不希望它们在公共池中执行,因为您将在其他地方遇到活力问题。
    • 让潜在的短期任务等待长时间运行的任务完成是没有问题的,实际上,总执行时间可能比交错执行的时间更短。只要所有 CPU 核心都在处理任务,这不是一个活力问题,只要你只是在谈论执行时间。但如果你在谈论优先级、交互任务或 I/O 操作,仅仅使用另一个线程池并不足以解决问题。请注意,对于同等优先级的线程,规范根本不保证抢先式任务切换……
    【解决方案3】:

    正如我在评论中所说,请查看How to start two threads at "exactly" the same time,但这应该是你要找的东西

    final CyclicBarrier gate = new CyclicBarrier(3);
    public void method(int id) {
        Thread one = new Thread (()->{
            gate.await();
            List<FooA> fooAs = generateFooA(id);
        });
        Thread two = new Thread (()->{
            gate.await();
            List<FooB> fooBs = generateFooB(id);
        });
        one.start();
        two.start();
        gate.await();
        //Do more processesing
    }
    
    public List<FooA> generateFooA(int id) {
        //code
    }
    
    public List<FooB> generateFooB(int id) {
        //code
    }
    

    【讨论】:

    • 盯着 2 个额外线程并使用 CompletableFuture 完全没有意义。
    • generateFooA(id) 调用和generateFooB(id) 调用可能正在顺利进行,在不同的线程中运行,在您的one 线程或您的two 线程到达gate.await() 之前打电话。
    • 您似乎认为调用get() 正在启动任务,但这些任务是在supplyAsync 内安排的。即使没有人调用get()join(),它们也会运行完成。除此之外,“完全同时”运行根本不是一个有用的目标。即使您设法同时启动两个任务,它们也可能在下一纳秒内不同步。不保证执行时间或线程调度。
    • 您将如何访问fooAsfooBs 并使用它们进行更多处理?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-06-28
    • 1970-01-01
    • 2019-12-16
    • 1970-01-01
    • 2021-08-03
    • 2021-09-11
    • 2019-08-01
    相关资源
    最近更新 更多