【问题标题】:How correctly reduce stream to another stream如何正确地将流减少到另一个流
【发布时间】:2017-03-13 11:04:19
【问题描述】:

我有字符串流和像

这样的空值
Stream<String> str1 = Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);

我想将它简化为另一个流,其中任何非空字符串序列连接在一起,即喜欢

Stream<String> str2 = Stream.of("ABC", "", "D", "EF","G")

我发现的第一种方法 - 创建收集器,首先将完整的输入流减少为具有所有连接字符串列表的单个对象,然后从中创建新流:

class Acc1 {
  final private List<String> data = new ArrayList<>();
  final private StringBuilder sb = new StringBuilder();

  private void accept(final String s) {
    if (s != null) 
      sb.append(s);
    else {
      data.add(sb.toString());
      sb.setLength(0);
    }
  }

  public static Collector<String,Acc1,Stream<String>> collector() {
    return Collector.of(Acc1::new, Acc1::accept, (a,b)-> a, acc -> acc.data.stream());
  }
}
...
Stream<String> str2 = str.collect(Acc1.collector());

但在这种情况下,在任何使用 str2 之前,即使是 str2.findFirst(),输入流也会被完全处理。它消耗时间和内存的操作以及来自某个生成器的无限流它根本无法工作

另一种方式 - 创建将保持中间状态并在 flatMap() 中使用的外部对象:

class Acc2 {
  final private StringBuilder sb = new StringBuilder();

  Stream<String> accept(final String s) {
    if (s != null) {
      sb.append(s);
      return Stream.empty();
    } else {
      final String result = sb.toString();
      sb.setLength(0);
      return Stream.of(result);
    }
  }
}
...
Acc2 acc = new Acc2();
Stream<String> str2 = str1.flatMap(acc::accept);

在这种情况下,从 str1 将只检索真正通过 str2 访问的元素。

但是使用在流处理之外创建的外部对象对我来说看起来很难看,并且可能会导致一些我现在看不到的副作用。此外,如果 str2 稍后将与 parallelStream() 一起使用,则会导致不可预测的结果。

有没有更正确的流->流减少没有这些缺陷的实现?

【问题讨论】:

  • 有一个库“StreamEx”,它包含 grouptun 方法(不记得确切的名称)。使用它,您可以将您的流转换为填充和空列表的流。下一步是显而易见的。
  • @Josep Prat:您应该永远reduce 与修改其参数的函数一起使用。 OP 已经使用的 collect 方法是可变归约的正确方法。事实上,无论您使用reduce 还是collect,减少都会处理所有元素,不会改变。
  • @Josep Prat:您从一个空的 List 作为“身份函数”开始,并省略了该列表如何与累加器函数中的字符串一起播放,但显然已经考虑修改 List ,您还写了“组合器是列表连接 (addAll)”,如果不是修改输入列表之一的函数,List.addAll 是什么?
  • 好吧,你的评论不应该是一个完整的解决方案,但是你为什么建议使用reduce呢?与基于Collector 的解决方案相比,OP 已经在问题中发布的所谓优势是什么?
  • @Josep Prat:当然,当你只有不可变的数据结构时,没有必要区分。这也意味着该语言的库将提供处理这些结构的必要工具(例如返回新列表的列表连接,但可能会在运行时以某种方式在后台进行优化,以减少甚至消除复制开销)。但在这里,问题不在于如何减少……

标签: java java-8 java-stream


【解决方案1】:

归约或其可变变体collect 始终是一个将处理所有项目的操作。您的操作可以通过自定义Spliterator 来实现,例如

public static Stream<String> joinGroups(Stream<String> s) {
    Spliterator<String> sp=s.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<String>(sp.estimateSize(), 
        sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
            private StringBuilder sb = new StringBuilder();
            private String last;

            public boolean tryAdvance(Consumer<? super String> action) {
                if(!sp.tryAdvance(str -> last=str))
                    return false;
                while(last!=null) {
                    sb.append(last);
                    if(!sp.tryAdvance(str -> last=str)) break;
                }
                action.accept(sb.toString());
                sb=new StringBuilder();
                return true;
            }
        }, false);
}

生成预期的组,您可以使用它进行测试

joinGroups(Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null))
    .forEach(System.out::println);

但也有所需的惰性行为,可通过

测试
joinGroups(
    Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null)
          .peek(str -> System.out.println("consumed "+str))
).skip(1).filter(s->!s.isEmpty()).findFirst().ifPresent(System.out::println);

经过深思熟虑,我想到了这个更有效的变体。仅当至少有两个 Strings 加入时,它才会合并 StringBuilder,否则,它将简单地使用已经存在的唯一 String 实例或文字 "" 字符串作为空组:

public static Stream<String> joinGroups(Stream<String> s) {
    Spliterator<String> sp=s.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<String>(sp.estimateSize(), 
        sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
            private String next;

            public boolean tryAdvance(Consumer<? super String> action) {
                if(!sp.tryAdvance(str -> next=str))
                    return false;
                String string=next;
                if(string==null) string="";
                else if(sp.tryAdvance(str -> next=str) && next!=null) {
                    StringBuilder sb=new StringBuilder().append(string);
                    do sb.append(next);while(sp.tryAdvance(str -> next=str) && next!=null);
                    string=sb.toString();
                }
                action.accept(string);
                return true;
            }
        }, false);
}

【讨论】:

  • 谢谢,这正是我要找的,本机方法而不是技巧。 Btw iterator() 来自输入流而不是 spliterator() 可能更舒服
  • @rustot:使用Iterator 很有诱惑力,因为我们更熟悉它,因为它更老了,在这种特殊情况下,我们可能会侥幸逃脱,但更常见的是,在源Spliterator 上实现Spliterator 更直接,并且可能更有效,因为我们将大小估计和特征传递给结果流。此外,大多数源流在通过 Spliterator 遍历时会更高效,hasNextnext 方法之间没有逻辑拆分……
  • 2 次调用 hasNext + next vs 2 次调用 tryAdvance + 接受和临时创建 lambda 但我不关心效率,迭代器允许前瞻“它是流中的最新元素”并对其进行特殊处理
  • @rustot:我不只考虑调用的数量。更多的是关于底层源代码如何实现它。参见例如Files.lines,它由单个方法BufferedReader.readLine 支持,它将返回下一行或null,它与tryAdavance 逻辑完全匹配。相比之下,Iterator 必须轮询hasNext 中的下一行并记住它,以便next 可以返回它,但它还必须确保在没有next 的情况下多次调用hasNext 不会提前并且在没有hasNext 的情况下调用next 也是正确的。
  • 换句话说,hasNext 的“前瞻”在这里是一种伪装。它实际上是对下一项的轮询,就像tryAdvance一样,后面跟着一个存储操作,所以后面的next调用可以取回它。但是一个简单的if(hasNext()) next() 带有五个 条件。首先,next 必须检查next 是否已经被调用,然后它会轮询下一项,检查结果并返回条件,然后调用者评估boolean 结果并调用next,这将再次检查next 是否已被调用以及结果如何。
【解决方案2】:

使用标准 Stream API 很难实现这样的场景。在我的免费 StreamEx 库中,我使用允许执行所谓的“部分缩减”的方法扩展了标准 Stream 接口,这正是这里所需要的:

StreamEx<String> str1 = StreamEx.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);
Stream<String> str2 = str1.collapse((a, b) -> a != null,
                          MoreCollectors.filtering(Objects::nonNull, Collectors.joining()));
str2.map(x -> '"'+x+'"').forEach(System.out::println);

输出:

"ABC"
""
"D"
"EF"
"G"

StreamEx.collapse() 方法使用提供的收集器执行流的部分缩减。第一个参数是一个谓词,适用于两个相邻的原始项目,如果它们必须一起归约,则应返回 true。这里我们只要求第一个不为空((a, b) -&gt; a != null):这意味着每个组都以null 结尾,新组从这里开始。现在我们需要将组字母连接在一起:这可以通过标准的Collectors.joining()collector 来完成。但是我们还需要过滤掉null。我们可以使用MoreCollectors.filtering 收集器来实现(实际上相同的收集器将在 Java 9 的 Collectors 类中可用)。

这个实现是完全惰性的,对并行处理非常友好。

【讨论】:

  • 谢谢你,有趣的图书馆!
猜你喜欢
  • 1970-01-01
  • 2021-05-22
  • 1970-01-01
  • 1970-01-01
  • 2010-09-18
  • 2011-01-06
  • 2015-05-25
  • 2011-06-22
相关资源
最近更新 更多