【问题标题】:Finite generated Stream in Java - how to create one?Java中的有限生成流-如何创建一个?
【发布时间】:2016-07-03 01:18:54
【问题描述】:

在 Java 中,使用Stream.generate(supplier) 可以轻松生成无限流。但是,我需要生成一个最终会完成的流。

例如,想象一下,我想要一个目录中所有文件的流。文件的数量可能很大,因此我无法预先收集所有数据并从中创建一个流(通过collection.stream())。我需要逐个生成序列。但是流显然会在某个时候结束,并且像(collect()findAny())这样的终端操作员需要处理它,所以Stream.generate(supplier) 不适合这里。

在 Java 中是否有任何合理简单的方法可以做到这一点,而无需我自己实现整个 Stream 接口?

我可以想到一个简单的技巧 - 使用无限 Stream.generate(supplier) 进行操作,并在获取所有实际值时提供 null 或抛出异常。但它会破坏标准的流操作符,我只能将它与我自己知道这种行为的操作符一起使用。

澄清

cmets 中的人向我求婚takeWhile() 运营商。这不是我的意思。如何更好地表达这个问题......我不是在问如何过滤(或限制)现有的流,我是在问如何动态地创建(生成)流,无需预先加载所有元素,但流会有有限大小(事先未知)。

解决方案

我要找的代码是

    Iterator it = myCustomIteratorThatGeneratesTheSequence();
    StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.DISTINCT), false);

我刚刚研究了java.nio.file.Fileslist(path) 方法是如何实现的。

【问题讨论】:

  • 我想我不明白。您是否正在寻找某种takeWhile,例如这里的stackoverflow.com/q/20746429/1743880
  • 你看过IntStream.range和朋友这样的方法吗?
  • openjdk 9 提供takeWhile()
  • @JanXMarek 一种方法是使用此答案stackoverflow.com/a/17959135/3973077 创建Iterator<File>,然后使用此stackoverflow.com/a/24511534/3973077Iterator 转换为Stream。这样迭代器只会在内存中保存一小堆文件,并且流将被延迟评估。虽然工作量很大。
  • 我强烈建议先检查Spliterator 的逻辑是否更合适,然后再执行更复杂的Iterator 以将其包装在Spliterator 中。有关示例,请参见 this answer...

标签: java java-8 java-stream


【解决方案1】:

在 Java 中是否有任何合理的简单方法可以做到这一点,而无需我自己实现整个 Stream 接口?

一个简单的.limit() 保证它会终止。但这并不总是足够强大。

Stream 工厂方法之后,在不重新实现流处理管道的情况下创建自定义流源的最简单方法是将java.util.Spliterators.AbstractSpliterator<T> 子类化并将其传递给java.util.stream.StreamSupport.stream(Supplier<? extends Spliterator<T>>, int, boolean)

如果您打算使用并行流,请注意 AbstractSpliterator 只会产生次优拆分。如果您对源代码有更多的控制权,那么完全实现Spliterator 接口会更好。

例如,以下 sn-p 将创建一个 Stream 提供无限序列 1,2,3...

在该特定示例中,您可以使用 IntStream.range()

但是流显然会在某个时候结束,像 (collect() 或 findAny()) 这样的终端操作符需要处理它。

findAny() 这样的短路操作实际上可以在无限流上完成,只要有任何匹配的元素。

Java 9 引入了Stream.iterate 来为一些简单的情况生成有限流。

【讨论】:

    【解决方案2】:

    Kotlin 代码从 InputStream 创建 JsonNode 流

    
        private fun InputStream.toJsonNodeStream(): Stream<JsonNode> {
            return StreamSupport.stream(
                    Spliterators.spliteratorUnknownSize(this.toJsonNodeIterator(), Spliterator.ORDERED),
                    false
            )
        }
    
        private fun InputStream.toJsonNodeIterator(): Iterator<JsonNode> {
            val jsonParser = objectMapper.factory.createParser(this)
    
            return object: Iterator<JsonNode> {
    
                override fun hasNext(): Boolean {
                    var token = jsonParser.nextToken()
                    while (token != null) {
                        if (token == JsonToken.START_OBJECT) {
                            return true
                        }
                        token = jsonParser.nextToken()
                    }
                    return false
                }
    
                override fun next(): JsonNode {
                    return jsonParser.readValueAsTree()
                }
            }
        }
    
    

    【讨论】:

      【解决方案3】:

      这是一个自定义且有限的流:

      package org.tom.stream;
      import java.util.*;
      import java.util.function.*;
      import java.util.stream.*;
      
      public class GoldenStreams {
      private static final String IDENTITY = "";
      
      public static void main(String[] args) {
          Stream<String> stream = java.util.stream.StreamSupport.stream(new Spliterator<String>() {
              private static final int LIMIT = 25;
              private int integer = Integer.MAX_VALUE;
              {
                  integer = 0;
              }
              @Override
              public int characteristics() {
                  return Spliterator.DISTINCT;
              }
              @Override
              public long estimateSize() {
                  return LIMIT-integer;
              }
              @Override
              public boolean tryAdvance(Consumer<? super String> arg0) {
                  arg0.accept(IDENTITY+integer++);
                  return integer < 25;
              }
              @Override
              public Spliterator<String> trySplit() {
                  System.out.println("trySplit");
                  return null;
              }}, false);
          List<String> peeks = new ArrayList<String>();
          List<String> reds = new ArrayList<String>();
          stream.peek(data->{
              peeks.add(data);
          }).filter(data-> {
              return Integer.parseInt(data)%2>0;
          }).peek(data ->{
              System.out.println("peekDeux:"+data);
          }).reduce(IDENTITY,(accumulation,input)->{
              reds.add(input);
              String concat = accumulation + ( accumulation.isEmpty() ? IDENTITY : ":") + input;
              System.out.println("reduce:"+concat);
              return concat;
          });
          System.out.println("Peeks:"+peeks.toString());
          System.out.println("Reduction:"+reds.toString());
      }
      }
      

      【讨论】:

        猜你喜欢
        • 2023-03-17
        • 2017-07-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-03-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多