【问题标题】:Nested parallel stream execution in Java - findAny() randomly failsJava 中的嵌套并行流执行 - findAny() 随机失败
【发布时间】:2020-11-20 12:01:16
【问题描述】:

对于 same 输入,以下代码在每 10-15 次尝试中抛出 IllegalArgumentException

        AllDirectedPaths<Vertex, Edge> allDirectedPaths = new AllDirectedPaths<>(graph);
        List<GraphPath<Vertex, Edge>> paths = allDirectedPaths.getAllPaths(entry, exit, true, null);


        return paths.parallelStream().map(path -> path.getEdgeList().parallelStream()
                .map(edge -> {
                    Vertex source = edge.getSource();
                    Vertex target = edge.getTarget();

                    if (source.containsInstruction(method, instructionIndex)) {
                        return source;
                    } else if (target.containsInstruction(method, instructionIndex)) {
                        return target;
                    } else {
                        return null;
                    }
                }).filter(Objects::nonNull)).findAny().flatMap(Stream::findAny)
                .orElseThrow(() -> new IllegalArgumentException("Given trace refers to no vertex in graph!"));

代码的想法是找到一个包装特定指令的顶点(参见containsInstruction()),而该顶点至少位于从entryexit 顶点的一条路径上。我知道代码在性能方面并不是最优的(路径上的每个中间顶点都被查找两次),但这并不重要。

输入只是一个跟踪(字符串),可以从中导出methodinstructionIndex。从这个意义上说,所有其他变量都是固定的。而且containsInstruction()这个方法没有任何副作用。

将“findAny()”流操作放在哪里重要吗?我应该把它直接放在过滤器操作之后吗?还是嵌套的并行流是问题所在?

【问题讨论】:

    标签: java parallel-processing java-stream


    【解决方案1】:

    您应该使用.flatMap(path -&gt; ... ) 并删除.flatMap(Stream::findAny)

    您的代码不起作用,因为第一个 findAny() 返回一个始终为非 null 的流,但它可能包含 null 元素。

    然后,当您通过Optional.flatMap(Stream::findAny) 调用应用第二个findAny() 时,最后一个查找操作可能会返回一个空的Optional,结果是内部流的null 元素结束.

    这就是代码的样子:

    return paths.stream()
        .flatMap(path -> path.getEdgeList().stream()
            .map(edge -> 
                 edge.getSource().containsInstruction(method, instructionIndex) ?
                 edge.getSource()                                               :
                 edge.getTarget().containsInstruction(method, instructionIndex) ?
                 edge.getTarget()                                               :
                 null)
            .filter(Objects::nonNull))
        .findAny()
        .orElseThrow(() -> new IllegalArgumentException("whatever"));
    

    请注意:为什么是并行流?您的管道中似乎没有 CPU 绑定任务。此外,并行流会产生大量开销。它们在极少数情况下很有用,即流水线中的数万个元素和密集的 CPU 操作


    编辑: 正如 cmets 中所建议的,内部流的 mapfilter 操作可以安全地移至外部流。这样,可读性得到了提高,并且在性能方面没有差异:

    return paths.stream()
        .flatMap(path -> path.getEdgeList().stream())
        .map(edge -> 
             edge.getSource().containsInstruction(method, instructionIndex) ?
             edge.getSource()                                               :
             edge.getTarget().containsInstruction(method, instructionIndex) ?
             edge.getTarget()                                               :
             null)
        .filter(Objects::nonNull)
        .findAny()
        .orElseThrow(() -> new IllegalArgumentException("whatever"));
    

    另一个注意事项:也许将map 中的代码重构为Edge 类的方法会更好,以便返回源、目标或null 的逻辑在已经有的类中所有信息。

    【讨论】:

    • 我会试试的。我只是使用并行流,因为它对我来说似乎很自然。它是少数没有共享变量减慢/阻碍并行执行的用例之一。
    • @auermich 在做出此类决定之前,请务必仔细测量和进行微基准测试。您肯定会注意到并行流的速度较慢
    • 只是为了澄清原始问题。内部映射可能会生成 [null,..,null] 形式的单个路径,其中过滤器会导致空列表,而 findAny() 可能会选择这样的空 Optional 导致异常?
    • 我认为,当嵌套代码尽可能小时,它会更具可读性。 mapfilter 步骤不需要附加到内部流表达式。他们可以在.flatMap(path -&gt; path.getEdgeList().stream()) 之后被链接
    • 在性能方面没有预期的差异,它只是影响了可读性。只有像sorteddistinct 这样的有状态操作会有所不同,但这应该很明显,因为移动它们将是语义上的变化。当中间操作需要访问path 时,它们必须留在内部操作中。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-08
    • 1970-01-01
    • 2012-12-15
    • 1970-01-01
    • 2019-02-23
    相关资源
    最近更新 更多