【问题标题】:Can I duplicate a Stream in Java 8?我可以在 Java 8 中复制流吗?
【发布时间】:2014-06-29 09:36:33
【问题描述】:

有时我想对流执行一组操作,然后使用其他操作以两种不同的方式处理生成的流。

我可以在不指定两次常用初始操作的情况下执行此操作吗?

例如,我希望存在如下dup() 方法:

Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup();
Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10

【问题讨论】:

  • 我确实意识到不会有性能提升,因为流是懒惰地评估的;我只是希望避免重复代码。
  • 为什么不把流变成列表呢?
  • 找到代码中的变化并将其提取到变量中。然后创建一个方法来提取可重用的代码片段并对其应用变量。
  • @Elazar 这样做不会节省内存,并且不适用于无限流!
  • 没有任何东西可以复制一般的无限流,没有更多信息。

标签: java java-8 java-stream


【解决方案1】:

无法以这种方式复制流。但是,您可以通过将公共部分移动到方法或 lambda 表达式中来避免代码重复。

Supplier<IntStream> supplier = () ->
    IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);

【讨论】:

  • 我正在考虑将接受的答案切换为 Elazar 的答案,并链接到您的答案,作为第二种解决方案的一个很好的例子,以及我在问题中使用的具体示例的解决方案。希望没关系。谢谢!
  • @necromancer:谢谢你的提问。随意更改接受的答案。
【解决方案2】:

一般情况下是不可能的。

如果您想复制输入流或输入迭代器,您有两种选择:

A.将所有内容保存在一个集合中,例如List&lt;&gt;

假设您将一个流复制为两个流s1s2。如果您在s1n2 元素中有高级n1 元素和s2,则必须将|n2 - n1| 元素保留在内存中,以保持同步。如果您的流是无限的,则所需的存储空间可能没有上限。

看看 Python 的 tee() 看看它需要什么:

此迭代工具可能需要大量辅助存储(取决于需要存储多少临时数据)。一般来说,如果一个迭代器在另一个迭代器启动之前使用了大部分或全部数据,那么使用list() 而不是tee() 会更快。

B.如果可能:复制创建元素的生成器的状态

要使此选项起作用,您可能需要访问流的内部工作原理。换句话说,生成器——创建元素的部分——应该首先支持复制。 [OP:请参阅此great answer,作为问题中示例如何完成此操作的示例]

它不适用于用户的输入,因为您必须复制整个“外部世界”的状态。 Java 的Stream 不支持复制,因为它被设计为尽可能通用;例如,处理文件、网络、键盘、传感器、随机性等。 [OP:另一个例子是按需读取温度传感器的流。不存储读数副本就无法复制]

这不仅在 Java 中如此;这是一般规则。可以看到,C++ 中的std::istream 仅支持移动语义,不支持复制语义(“复制构造函数(已删除)”),出于这个原因(以及其他原因)。

【讨论】:

  • +1 很棒的答案;可能会接受并链接到当前接受的答案作为“B”点的具体示例。
  • 阻塞队列将是一种允许有界存储问题的解决方案,其中第一个流的读取器将被阻塞,直到第二个流被消耗。自然,并不总是适用,但可能适用于某些用例,尤其是。缓冲区很大。
  • 请注意,您也许可以压缩 n2 - n1 元素,尽管我认为这不太实用。
  • 当我遇到这种类型的问题时(不是那么频繁,因为我对 java.streams 比较陌生!)我最初的直觉是选择选项 A。但是我一直对此感到不安,它似乎我们正在离开流世界,然后又回到......只是为了重复。我认为我非常赞成选项 B。我在我的项目中使用它,它看起来可以按预期工作。从表面上看,可能需要重复处理,我想这对于 Streams API 的作者来说更像是一个问题??
【解决方案3】:

如果您正在缓冲已在一个副本中使用但尚未在另一副本中使用的元素,则有可能。

我们在jOOλ 中为流实现了duplicate() 方法,这是我们为改进jOOQ 的集成测试而创建的一个开源库。本质上,你可以写:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate();

(注意:我们目前需要对流进行装箱,因为我们还没有实现IntSeq

在内部,有一个LinkedList 缓冲区存储已从一个流中使用但未从另一个流中使用的所有值。如果您的两个流以相同的速率消耗,这可能会尽可能高效。

算法的工作原理如下:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final LinkedList<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

More source code here

事实上,使用jOOλ,你就可以像这样写一个完整的单行:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate()
 .map1(s -> s.filter(n -> n % 7 == 0))
 .map2(s -> s.filter(n -> n % 5 == 0));

// This will yield 14, 28, 42, 56...
desired_streams.v1.forEach(System.out::println)

// This will yield 10, 20, 30, 40...
desired_streams.v2.forEach(System.out::println);

【讨论】:

  • 谢谢,但当前接受的答案确实指出:“如果您在 s1 中具有高级 n1 元素,并且在 s2 中具有 n2 元素,则必须将 |n2 - n1| 元素保留在内存中,以跟上步伐。如果您的流是无限的,则所需的存储空间将没有上限。”
【解决方案4】:

您还可以将流生成移动到返回此流并调用两次的单独方法/函数中。

【讨论】:

    【解决方案5】:

    要么,

    • 将初始化移动到一个方法中,然后再次调用该方法

    这具有明确说明您在做什么的优点,并且也适用于无限流。

    • 收集流,然后重新流式传输

    在你的例子中:

    final int[] arr = IntStream.range(1, 100).filter(n -> n % 2 == 0).toArray();
    

    然后

    final IntStream s = IntStream.of(arr);
    

    【讨论】:

    • 谢谢,我意识到有一个更简单的答案(见我自己的答案);收集流的内存效率不高,根本不适用于无限流。
    • 您的回答不涉及处理流。根据您的问题,我了解到您想获取单个流并说,将其收集到 Mapsum() 它。您只是在着手设置管道。
    【解决方案6】:

    更新:不起作用。请参阅下面的解释,在原始答案的文本之后。

    我真傻。我需要做的就是:

    Stream desired_stream = IntStream.range(1, 100).filter(n -> n % 2 == 0);
    Stream stream14 = desired_stream.filter(n -> n % 7 == 0); // multiples of 14
    Stream stream10 = desired_stream.filter(n -> n % 5 == 0); // multiples of 10
    

    解释为什么这不起作用:

    如果您对其进行编码并尝试收集两个流,第一个将正常收集,但尝试对第二个流进行流式处理将引发异常:java.lang.IllegalStateException: stream has already been operated upon or closed

    详细地说,流是有状态的对象(顺便说一下,它不能重置或倒带)。您可以将它们视为迭代器,而迭代器又类似于指针。所以stream14stream10 可以被认为是对同一个指针的引用。一直使用第一个流将导致指针“越过终点”。尝试消费第二个流就像尝试访问一个已经“结束”的指针,这自然是非法操作。

    正如公认的答案所示,创建流的代码必须执行两次,但可以划分为 Supplier lambda 或类似的构造。

    完整的测试代码:保存到Foo.java,然后是javac Foo.java,然后是java Foo

    import java.util.stream.IntStream;
    
    public class Foo {
      public static void main (String [] args) {
        IntStream s = IntStream.range(0, 100).filter(n -> n % 2 == 0);
        IntStream s1 = s.filter(n -> n % 5 == 0);
        s1.forEach(n -> System.out.println(n));
        IntStream s2 = s.filter(n -> n % 7 == 0);
        s2.forEach(n -> System.out.println(n));
      }
    }
    

    输出:

    $ javac Foo.java
    $ java Foo
    0
    10
    20
    30
    40
    50
    60
    70
    80
    90
    Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
        at java.util.stream.AbstractPipeline.<init>(AbstractPipeline.java:203)
        at java.util.stream.IntPipeline.<init>(IntPipeline.java:91)
        at java.util.stream.IntPipeline$StatelessOp.<init>(IntPipeline.java:592)
        at java.util.stream.IntPipeline$9.<init>(IntPipeline.java:332)
        at java.util.stream.IntPipeline.filter(IntPipeline.java:331)
        at Foo.main(Foo.java:8)
    

    【讨论】:

    • 不可变状态,好吧,给你。 ;D
    • 您可以添加一些解释为什么这种方法不起作用,而不是删除。以便其他人可以从中学习。
    • @mschenk74 给你 :)
    【解决方案7】:

    从 Java 12 开始,我们有了 Collectors::teeing,它允许我们将主流管道的元素传递给 2 个或更多下游收集器。

    根据您的示例,我们可以执行以下操作:

    @Test
    void shouldProcessStreamElementsInTwoSeparateDownstreams() {
        class Result {
            List<Integer> multiplesOf7;
            List<Integer> multiplesOf5;
    
            Result(List<Integer> multiplesOf7, List<Integer> multiplesOf5) {
                this.multiplesOf7 = multiplesOf7;
                this.multiplesOf5 = multiplesOf5;
            }
        }
    
        var result = IntStream.range(1, 100)
                .filter(n -> n % 2 == 0)
                .boxed()
                .collect(Collectors.teeing(
                        Collectors.filtering(n -> n % 7 == 0, Collectors.toList()),
                        Collectors.filtering(n -> n % 5 == 0, Collectors.toList()),
                        Result::new
                ));
    
        assertTrue(result.multiplesOf7.stream().allMatch(n -> n % 7 == 0));
        assertTrue(result.multiplesOf5.stream().allMatch( n -> n % 5 == 0));
    }
    

    还有许多其他收集器允许做其他事情,例如通过在下游使用Collectors::mapping,您可以从同一来源获得两个不同的对象/类型,如this article 所示。

    【讨论】:

      【解决方案8】:

      对于非无限流,如果您可以访问源,则直接:

      @Test
      public void testName() throws Exception {
          List<Integer> integers = Arrays.asList(1, 2, 4, 5, 6, 7, 8, 9, 10);
          Stream<Integer> stream1 = integers.stream();
          Stream<Integer> stream2 = integers.stream();
      
          stream1.forEach(System.out::println);
          stream2.forEach(System.out::println);
      }
      

      打印

      1 2 4 5 6 7 8 9 10

      1 2 4 5 6 7 8 9 10

      对于您的情况:

      Stream originalStream = IntStream.range(1, 100).filter(n -> n % 2 == 0)
      
      List<Integer> listOf = originalStream.collect(Collectors.toList())
      
      Stream stream14 = listOf.stream().filter(n -> n % 7 == 0);
      Stream stream10 = listOf.stream().filter(n -> n % 5 == 0);
      

      对于性能等,请阅读其他人的答案;)

      【讨论】:

      • 谢谢,但问题的实质是tee 一个源不可访问或不确定的流,例如标准输入。我现在很清楚,这种开球的成本主要源于有效消费率的差异。 e. G。如果一个消费者远远领先于另一个消费者,则需要单独额外缓冲差异。
      【解决方案9】:

      我用this 很好的答案写了以下课程:

      public class SplitStream<T> implements Stream<T> {
          private final Supplier<Stream<T>> streamSupplier;
      
          public SplitStream(Supplier<Stream<T>> t) {
              this.streamSupplier = t;
          }
      
          @Override
          public Stream<T> filter(Predicate<? super T> predicate) {
              return streamSupplier.get().filter(predicate);
          }
      
          @Override
          public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
              return streamSupplier.get().map(mapper);
          }
      
          @Override
          public IntStream mapToInt(ToIntFunction<? super T> mapper) {
              return streamSupplier.get().mapToInt(mapper);
          }
      
          @Override
          public LongStream mapToLong(ToLongFunction<? super T> mapper) {
              return streamSupplier.get().mapToLong(mapper);
          }
      
          @Override
          public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
              return streamSupplier.get().mapToDouble(mapper);
          }
      
          @Override
          public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
              return streamSupplier.get().flatMap(mapper);
          }
      
          @Override
          public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
              return streamSupplier.get().flatMapToInt(mapper);
          }
      
          @Override
          public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
              return streamSupplier.get().flatMapToLong(mapper);
          }
      
          @Override
          public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
              return streamSupplier.get().flatMapToDouble(mapper);
          }
      
          @Override
          public Stream<T> distinct() {
              return streamSupplier.get().distinct();
          }
      
          @Override
          public Stream<T> sorted() {
              return streamSupplier.get().sorted();
          }
      
          @Override
          public Stream<T> sorted(Comparator<? super T> comparator) {
              return streamSupplier.get().sorted(comparator);
          }
      
          @Override
          public Stream<T> peek(Consumer<? super T> action) {
              return streamSupplier.get().peek(action);
          }
      
          @Override
          public Stream<T> limit(long maxSize) {
              return streamSupplier.get().limit(maxSize);
          }
      
          @Override
          public Stream<T> skip(long n) {
              return streamSupplier.get().skip(n);
          }
      
          @Override
          public void forEach(Consumer<? super T> action) {
              streamSupplier.get().forEach(action);
          }
      
          @Override
          public void forEachOrdered(Consumer<? super T> action) {
              streamSupplier.get().forEachOrdered(action);
          }
      
          @Override
          public Object[] toArray() {
              return streamSupplier.get().toArray();
          }
      
          @Override
          public <A> A[] toArray(IntFunction<A[]> generator) {
              return streamSupplier.get().toArray(generator);
          }
      
          @Override
          public T reduce(T identity, BinaryOperator<T> accumulator) {
              return streamSupplier.get().reduce(identity, accumulator);
          }
      
          @Override
          public Optional<T> reduce(BinaryOperator<T> accumulator) {
              return streamSupplier.get().reduce(accumulator);
          }
      
          @Override
          public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
              return streamSupplier.get().reduce(identity, accumulator, combiner);
          }
      
          @Override
          public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
              return streamSupplier.get().collect(supplier, accumulator, combiner);
          }
      
          @Override
          public <R, A> R collect(Collector<? super T, A, R> collector) {
              return streamSupplier.get().collect(collector);
          }
      
          @Override
          public Optional<T> min(Comparator<? super T> comparator) {
              return streamSupplier.get().min(comparator);
          }
      
          @Override
          public Optional<T> max(Comparator<? super T> comparator) {
              return streamSupplier.get().max(comparator);
          }
      
          @Override
          public long count() {
              return streamSupplier.get().count();
          }
      
          @Override
          public boolean anyMatch(Predicate<? super T> predicate) {
              return streamSupplier.get().anyMatch(predicate);
          }
      
          @Override
          public boolean allMatch(Predicate<? super T> predicate) {
              return streamSupplier.get().allMatch(predicate);
          }
      
          @Override
          public boolean noneMatch(Predicate<? super T> predicate) {
              return streamSupplier.get().noneMatch(predicate);
          }
      
          @Override
          public Optional<T> findFirst() {
              return streamSupplier.get().findFirst();
          }
      
          @Override
          public Optional<T> findAny() {
              return streamSupplier.get().findAny();
          }
      
          @Override
          public Iterator<T> iterator() {
              return streamSupplier.get().iterator();
          }
      
          @Override
          public Spliterator<T> spliterator() {
              return streamSupplier.get().spliterator();
          }
      
          @Override
          public boolean isParallel() {
              return streamSupplier.get().isParallel();
          }
      
          @Override
          public Stream<T> sequential() {
              return streamSupplier.get().sequential();
          }
      
          @Override
          public Stream<T> parallel() {
              return streamSupplier.get().parallel();
          }
      
          @Override
          public Stream<T> unordered() {
              return streamSupplier.get().unordered();
          }
      
          @Override
          public Stream<T> onClose(Runnable closeHandler) {
              return streamSupplier.get().onClose(closeHandler);
          }
      
          @Override
          public void close() {
              streamSupplier.get().close();
          }
      }
      

      当你调用它的类的任何方法时,它会将调用委托给

      streamSupplier.get()
      

      所以,而不是:

      Supplier<IntStream> supplier = () ->
          IntStream.range(1, 100).filter(n -> n % 2 == 0);
      supplier.get().filter(...);
      supplier.get().filter(...);
      

      你可以这样做:

      SplitStream<Integer> stream = 
          new SplitStream<>(() -> IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed());
      stream.filter(...);
      stream.filter(...);
      

      您可以将其扩展为与 IntStream、DoubleStream 等一起使用...

      【讨论】:

        【解决方案10】:

        我认为将 Concat 与空流一起使用可以满足您的需求。 试试这样的:

        Stream<Integer> concat = Stream.concat(Stream.of(1, 2), Stream.empty());
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2018-01-04
          • 2017-08-08
          • 1970-01-01
          • 2017-01-26
          • 2012-03-03
          • 2014-04-26
          相关资源
          最近更新 更多