【问题标题】:CompletableFuture stream created from iterator is not lazily evaluated从迭代器创建的 CompletableFuture 流不会被延迟评估
【发布时间】:2018-10-26 02:08:21
【问题描述】:

我对如何以及何时完成可完成的期货有点苦恼。我已经创建了这个测试用例:

import org.junit.Test;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamOfCompletableFuturesTest {
    @Test
    public void testList() {
        completeFirstTwoElements(
                Stream.of("list one", "list two", "list three", "list four", "list five")
        );
    }

    @Test
    public void testIterator() {
        Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

        completeFirstTwoElements(
            StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
        );
    }

    private void completeFirstTwoElements(Stream<String> stream) {
        stream
                .map(this::cf)
                .limit(2)
                .parallel()
                .forEach(cf -> {
                    try {
                        System.out.println(cf.get());
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                });
    }

    private CompletableFuture<String> cf(String result) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Running " + result);
            return result;
        });
    }
}

输出是:

Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one

testList 方法按预期工作。 CompletableFuture 只在最后评估,所以在 limit 方法之后只保留了前两项。

但是,testIterator 方法是出乎意料的。所有CompletableFuture 都已完成,限制仅在之后完成。

如果我从流中删除 parallel() 方法,它会按预期工作。但是,处理(forEach())应该并行完成,因为在我的完整程序中它是一个长时间运行的方法。

谁能解释为什么会这样?

看起来这取决于 Java 版本,所以我使用的是 1.8:

$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)

【问题讨论】:

  • 在 Windows 10 64 位上与 java 9.0.4 (build 9.0.4+11) 配合良好
  • 是的,我确认它使用jdk-1.8.0_152 给出了不正确的结果
  • @MarkoPacak 感谢您发现这是与 Java 版本相关的。我什至没有想到这一点

标签: java java-stream completable-future spliterator


【解决方案1】:

并行性适用于整个管道,因此您无法真正控制在limit() 并行应用Stream 之前将执行什么。唯一的保证是limit() 之后的内容只会在保留的元素上执行。

两者之间的差异可能是由于某些实现细节或其他Stream 特性。事实上,您可以通过使用SIZED 特性轻松地反转行为。似乎当 Stream 的大小已知时,只处理了 2 个元素。

例如,应用一个简单的filter() 会丢失列表版本的大小:

completeFirstTwoElements(
        Stream.of("list one", "list two", "list three", "list four", "list five").filter(a -> true)
);

例如输出:

Running list one
Running list five
Running list two
Running list three
list one
list two

并且不使用 unknown size 版本的Spliterator.spliterator()“修复”了该行为:

Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

completeFirstTwoElements(
        StreamSupport.stream(Spliterators.spliterator(iterator, Spliterator.ORDERED, 5), false)
);

输出:

Running iterator two
Running iterator one
iterator one
iterator two

【讨论】:

  • 有趣的是,Java 9 对待“未知大小”的方式与带有估计值的流不同,因此将修复 Spliterators.spliterator(iterator, Spliterator.ORDERED, 5) 与随后的 a -&gt; true 过滤器组合起来会比 Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED) 执行得更差,无论是否使用过滤器.不管怎样,恭喜你比我快 10 秒。
【解决方案2】:

您的陈述“所有CompletableFutures 都已完成”等同于“所有CompletableFutures 已创建”,因为一旦supplyAsync 已执行,供应商的评估已安排,无论是否有人会最终是否调用get

所以你在这里感知到的,是传递给map的函数的求值,即使后续处理也不会消耗结果。这是一个有效的行为;只要 Stream 之后会使用正确的结果,就限制和遇到顺序而言,该函数可能会以任意顺序甚至同时对更多元素进行评估。

现在,是否会评估超过必要的元素以及处理了多少多余的元素,这是一个实现细节,并且实现已经改变,如“Internal changes for limit and unordered stream”中所述。虽然问答是关于无序流的,但有可能对有序流进行了类似的改进。

要点是,您不应假设仅针对所需元素的最少数量评估函数。这样做会降低并行处理的效率。这仍然适用,即使 Java 9 改进了并行 limit 操作。一个简单的改变可能会重新引入对更多元素的评估:

private void completeFirstTwoElements(Stream<String> stream) {
    stream.map(this::cf)
          .filter(x -> true)
          .limit(2)
          .parallel()
          .forEach(cf -> System.out.println(cf.join()));
}

【讨论】:

  • 感谢您的回复(您也是@didier-l)。还有一个问题:有没有办法取消调度?在我的完整程序中,supplyAsync 调用中发生的事情需要很长时间,并且会阻止程序终止,直到所有 supplyAsync 都完全执行。这不是必需的,我只想要前 10 个。(顺便说一句,我觉得我需要重新考虑我的程序的架构,因为 CompletableFuture 似乎与我想做的事情不太匹配)
  • 取消CompletableFuture 不会中断已经进行的评估,它可能只会阻止对取消时尚未开始的函数的评估。但是,这不会阻止您的应用程序终止。仅当您在关联的未来调用 getjoin() 时,您的应用程序才会被阻止。如果您取消了未来,您甚至不会在 getjoin() 上被阻止,尽管可能仍有正在进行的背景评估,其结果将不会在之后使用。这些后台评估不会阻止 JVM 终止。
  • 再次感谢,问题是我正在使用thenApply 链接其他操作,这些操作正在等待所有supplyAsync 调用完成。这是很棘手的事情,尽管 API 看起来相当简单。
猜你喜欢
  • 2013-06-19
  • 1970-01-01
  • 2021-04-21
  • 2021-05-17
  • 1970-01-01
  • 1970-01-01
  • 2011-03-03
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多