【问题标题】:Calling two functions in parallel in Java 8在 Java 8 中并行调用两个函数
【发布时间】:2021-02-04 01:09:28
【问题描述】:

我的 Spring Boot 应用程序中有一个用例如下:

我想使用以下函数从响应中获取 id 字段值:

String id = getIdFromResponse(response);

如果我在响应中没有得到任何 id,那么我使用以下函数检查请求参数中是否存在 id 字段:

String id = getIdFromRequest(request);

到目前为止,我正在按顺序调用它们。但我想让这两个函数并行运行,我想从它们中的任何一个获得 id 后立即停止。

我想知道是否有任何方法可以在 Java 8 中使用 streams 来实现这一点。

【问题讨论】:

  • 小心在生产环境中执行此代码,有时不应该进行数据库调用,这可以通过if 检查来阻止。
  • 这是纳曼的一个好点。顺序处理确保第二次调用仅在第一次调用没有返回结果时发生。这允许使用调用可能返回id 的概率来优化流程。并行的不保证并且很可能执行两个数据库调用(竞争条件)。
  • @NikolasCharalambidis 当然,感谢您指出这一点。我会记住这一点。然而,这两个函数实际上都没有与 DB 交互。

标签: java spring multithreading spring-boot java-8


【解决方案1】:

你可以这样使用:

String id = Stream.<Supplier<String>>of(
        () -> getIdFromResponse(response), 
        () -> getIdFromRequest(request)
    )
    .parallel()
    .map(Supplier::get)
    .filter(Objects::nonNull)
    .findFirst()
    .orElseThrow():

需要供应商,因为当你不使用它们时,两个请求仍然按顺序执行。

我还假设您的方法在找不到任何内容时返回 null,因此我必须使用 .filter(Objects::nonNull) 过滤掉这些值。

根据您的用例,您可以将 .orElseThrow() 替换为不同的内容,例如 .orElse(null)

【讨论】:

    【解决方案2】:

    只要有专门的方法,就不需要使用 Stream API。

    ExecutorService::invokeAny(Collection&lt;? extends Callable&lt;T&gt;&gt;)

    执行给定任务,返回已成功完成的任务的结果(即不抛出异常),如果有的话。正常或异常返回时,取消未完成的任务。

    List<Callable<String>> collection = Arrays.asList(
        () -> getIdFromResponse(response),
        () -> getIdFromRequest(request)
    );
    
    // you want the same number of threads as the size of the collection
    ExecutorService executorService = Executors.newFixedThreadPool(collection.size());
    String id = executorService.invokeAny(collection);
    

    三个注意事项:

    【讨论】:

    • 我真的很喜欢这个答案,比像我在答案中那样乱搞Streams 要好得多。旁注:也可以使用ForkJoinPool.commonPool() 作为执行器服务。这样就不必自己创建一个。
    • 谢谢 Lino,我不知道这样的游泳池存在。浏览文档后,我知道这应该避免在单个 JVM 上创建太多线程池的开销。 ForkJoinPool 也实现了工作窃取。好点子!
    【解决方案3】:

    如果您想完全控制何时启用替代评估,您可以使用CompletableFuture

    CompletableFuture<String> job
        = CompletableFuture.supplyAsync(() -> getIdFromResponse(response));
    String id;
    try {
        id = job.get(300, TimeUnit.MILLISECONDS);
    }
    catch(TimeoutException ex) {
        // did not respond within the specified time, set up alternative
        id = job.applyToEither(
            CompletableFuture.supplyAsync(() -> getIdFromRequest(request)), s -> s).join();
    }
    catch(InterruptedException|ExecutionException ex) {
        // handle error
    }
    

    仅当第一个作业在指定时间内未完成时才提交第二个作业。然后,无论哪个作业先响应,都会提供结果值。

    【讨论】:

    • 不错的一个!遗憾的是,CompletableFuture 周围的 API 对异常处理并不友好。
    • @NikolasCharalambidis 问题出在哪里?恕我直言,这是一个比使用ExecutorService 更干净、更高级别的方法,并且以同样的方式“异常处理友好”。
    • 我的意思是答案很好,但CompletableFuture API 对我来说似乎并不灵活。我很感激这些异常状态可以通过 API 使用 ex 来处理。 lambda 表达式而不是捕获 ExecutorService 所需的异常。
    • @NikolasCharalambidis CompletableFuture API 允许通过 lambda 表达式处理异常。但这种方法的关键在于使用与ExecutorService.invokeAny 相同的并发API 中的Future.get(long,TimeUnit),因此它需要处理相同的异常。我考虑过使用 JDK 9+ orTimeout 和 lambda 表达式提供替代方案,但结果要复杂得多。
    • @Lino no,supplyAsync 没有明确的Executor 将提交给ForkJoinPool.commonPool()。但结果取决于实际行为。当第一次尝试在指定时间内完成时,第二次将不会启动,这就是这种方法的意义所在。但是当它没有完成时,第二个是同时启动的,一旦它们中的任何一个完成,发起者就可以继续执行结果。与并行流不同,它仍将等待所有线程完成。
    猜你喜欢
    • 2018-06-05
    • 2017-08-24
    • 1970-01-01
    • 1970-01-01
    • 2019-08-09
    • 2021-06-08
    • 1970-01-01
    • 2017-03-02
    • 1970-01-01
    相关资源
    最近更新 更多