【问题标题】:Performance implications of Java parallel streamsJava 并行流的性能影响
【发布时间】:2019-02-07 19:26:32
【问题描述】:

使用.stream().parallel() 的最佳做法是什么?

例如,如果您有一堆阻塞 I/O 调用,并且您想检查 .anyMatch(...),并行执行此操作似乎是明智之举。

示例代码:

public boolean hasAnyRecentReference(JobId jobid) {
  <...>
  return pendingJobReferences.stream()
     .parallel()
     .anyMatch(pendingRef -> { 
       JobReference readReference = pendingRef.sync();
       Duration referenceAge = timeService.timeSince(readReference.creationTime());
       return referenceAge.lessThan(maxReferenceAge)
     });
}

乍一看这似乎很合理,因为我们可以同时执行多个阻塞读取,因为我们只关心匹配的任何一个,而不是一个接一个地检查(所以如果每次读取需要 50 毫秒,我们只需要等待( 50ms * expectedNumberOfNonRecentRefs) / numThreads ).

在生产环境中引入此代码是否会对代码库的其他部分产生任何不可预见的性能影响?

【问题讨论】:

    标签: java concurrency parallel-processing java-stream forkjoinpool


    【解决方案1】:

    编辑:正如@edharned 指出的那样,.parallel() 现在使用CountedCompleter 而不是调用.join(),这有其自身的问题,Ed 在http://coopsoft.com/ar/Calamity2Article.htmlWhat is currently being done? 部分下也解释了这一点。

    我相信下面的信息对于理解为什么 fork-join 框架很棘手以及结论中对.parallel() 提出的替代方案仍然有用。


    虽然代码的精神是正确的,但实际代码可能会对所有使用 .parallel() 的代码产生系统范围的影响,即使这一点都不明显。

    不久前我发现一篇文章建议不要这样做:https://dzone.com/articles/think-twice-using-java-8,但直到最近我才深入挖掘。

    这些是我读了一堆之后的想法:

    1. Java 中的.parallel() 使用ForkJoinPool.commonPool(),这是一个由所有流共享的单例ForkJoinPoolForkJoinPool.commonPool() 是一种公共静态方法,因此理论上其他库/部分代码可以使用它)
    2. ForkJoinPool 实现工作窃取,除了共享队列外,还具有每个线程的队列

      1. 工作窃取意味着当一个线程空闲时,它会寻找更多的工作来做
      2. 一开始我想:按照这个定义,cached 线程池不是也做工作窃取(即使一些引用称缓存线程池的工作共享)?
      3. 事实证明,使用空闲这个词时似乎有些术语模糊:

        1. cached 线程池中,线程仅在完成其任务后才空闲。如果它在等待阻塞调用时被阻塞,它确实不会变得空闲
        2. forkjoin 线程池中,线程在完成其任务或在子任务上调用.join() 方法(这是一个特殊的阻塞调用)时处于空闲状态。

          在子任务上调用.join() 时,线程在等待该子任务完成时变为空闲状态。在空闲时,它会尝试执行任何其他可用任务,即使它在另一个线程的队列中(它窃取工作)。

          [这是重要的一点]一旦它找到另一个要执行的任务,它必须在恢复原来的执行之前完成它,即使它正在等待的子任务在线程仍在运行时完成执行偷来的任务。

          [这也很重要] 这种工作窃取行为仅适用于调用.join() 的线程。如果一个线程在其他事情上被阻塞,比如 I/O,它就会变得空闲(即它不会窃取工作)。

    3. Java 流不允许您提供自定义 ForkJoinPool,但 https://github.com/amaembo/streamex 可以

    我花了一段时间才理解 2.3.2 的含义,所以我将举一个简单的例子来帮助说明这个问题:

    注意:这些是虚拟示例,但您可以通过使用流在内部执行 fork join 操作而在没有意识到的情况下进入等效情况。

    另外,我将使用极其简化的伪代码,仅用于说明 .parallel() 问题,但在其他方面不一定有意义。

    假设我们正在实现归并排序

    merge_sort(list):
        left, right = split(list)
    
        leftTask = mergeSortTask(left).fork()
        rightTask = mergeSortTaks(right).fork()
    
        return merge(leftTask.join(), rightTask.join())
    

    现在假设我们有另一段代码执行以下操作:

    dummy_collect_results(queriesIds):
       pending_results = []
    
       for id in queriesIds: 
         pending_results += longBlockingIOTask(id).fork()
    
      // do more stuff
    

    这里发生了什么?

    当您编写合并排序代码时,您认为排序调用不进行任何 I/O,因此它们的性能应该是相当确定的,对吧?

    没错。你可能没想到的是,由于dummy_collect_results 方法创建了一堆长时间运行和阻塞的子任务,当执行合并排序任务的线程阻塞在.join() 上,等待子任务完成时,它们可能会开始执行其中一个长阻塞子任务。

    这很糟糕,因为如上所述,一旦长阻塞(在 I/O 上,不是 .join() 调用,因此线程不会再次空闲)被盗,它必须完成,不管线程通过.join() 等待的子任务是否在阻塞 I/O 时完成。

    这使得归并排序任务的执行不再具有确定性,因为执行这些任务的线程最终可能会窃取由完全位于其他地方的代码生成的 I/O 密集型任务。

    这也是非常可怕且难以捕捉的,因为您可以在整个代码库中使用 .parallel() 而没有任何问题,并且只需要一个类在使用 .parallel() 时引入长时间运行的任务以及所有突然,您的代码库的所有其他部分可能会出现不一致的性能。

    所以我的结论是:

    1. 理论上,.parallel() 可以保证在代码中的任何位置创建的所有任务都很短
    2. .parallel() 可能会对系统范围的性能产生影响,除非您知道(例如,如果您稍后添加使用 .parallel() 并且任务很长的单段代码,您可能会影响使用的所有代码的性能.parallel())
    3. 因为2.,你最好完全避免.parallel(),要么使用ExecutorCompletionService,要么使用https://github.com/amaembo/streamex,这样你就可以提供自己的ForkJoinPool(允许更多的隔离)。更好的是,您可以使用https://github.com/palantir/streams/blob/1.9.1/src/main/java/com/palantir/common/streams/MoreStreams.java#L53,它可以让您对并发机制进行更细粒度的控制。

    【讨论】:

    • parallel() 使用 CountedCompleter 类,而不是 join()。正如你所指出的,我第一次写的时候,join() 不像广告宣传的那样工作。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-09-06
    • 1970-01-01
    • 2014-08-28
    • 2011-02-27
    • 2014-07-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多