【问题标题】:How to design a returned stream that may use skip如何设计可能使用跳过的返回流
【发布时间】:2016-06-29 15:55:23
【问题描述】:

我创建了一个解析库,它接受提供的输入并返回记录流。然后一个程序调用这个库并处理结果。就我而言,我的程序正在使用类似

recordStream.forEach(r -> insertIntoDB(r));

可以提供给解析库的输入类型之一是平面文件,它可能有一个标题行。因此,解析库可以配置为跳过标题行。如果配置了标题行,它会在返回中添加一个 skip(n) 元素,例如

Files.lines(input)**.skip(1)**.parallel().map(r -> createRecord(r));  

解析库返回结果流。

但是,skip、parallel 和 forEach 似乎不能很好地结合在一起,最终程序员必须改为调用 forEachOrdered,但是把这个要求放在程序员身上是很糟糕的设计,期望他们知道在处理时必须使用 forEachOrdered带有标题行的文件的输入类型。

我如何在必要时在返回的流链的构造中自己执行有序要求,以将功能齐全的流返回给程序编写者,而不是具有隐藏限制的流?将流包装在另一个流中的答案是什么?

【问题讨论】:

  • 最简单的解决方案是要求 JRE 是最新的,即1.8u60 或更新版本...

标签: java design-patterns java-stream encapsulation


【解决方案1】:

forEachOrdered 是必要的,不是因为skip(),而是因为您的 Stream 是并行的。即使流是并行的,流也会跳过 first 元素,如文档中所示:

虽然 skip() 在顺序流管道上通常是一种便宜的操作,但在有序并行管道上可能会非常昂贵,尤其是对于较大的 n 值,因为 skip(n) 不仅限于跳过任何 n 个元素,而且相遇顺序中的前 n 个元素。

已明确记录 forEach 不一定尊重订单。当您关心订单时不使用forEachOrdered只是对Stream API的滥用:

此操作的行为是明确的不确定性。对于并行流管道,此操作不能保证尊重流的遇到顺序,因为这样做会牺牲并行性的好处。

我不会从库中返回并行流。我会返回一个连续的(其中 forEach 会尊重顺序),并让调用者调用parallel() 并根据需要承担后果。

默认使用并行流是bad idea

【讨论】:

  • 即使我删除了parallel(),如果添加parallel(),最终程序员也会有使用forEachOrdered()的意外要求,仅在某些情况下。无论最终程序员做什么或不做什么,我都需要跳过可靠地工作。我继续在库中使用skip()forEachOrdered()(仅在必要时)从最终程序员封装它并将结果提供给新的流。
  • 一点也不意外。它被清楚地记录在案,因此任何知道如何使用流 API 的人都期望它。而且不仅在某些情况下。每次使用并行流时,都不能依赖 forEach 来尊重顺序。就像,每次你从多个线程执行相同的任务时,你不能期望这些任务是按顺序执行的。最后,正如我在回答中所说,skip() 确实工作可靠,无论流是否并行。
  • 如果在流并行时 skip() 不能可靠地跳过第一行,这意味着您的流没有排序。但是 Files.lines() 确实返回了一个有序的流。
  • 在这里进一步讨论:stackoverflow.com/questions/28259636/…
  • 啊,很好的发现。因此,如果我得到正确的答案,现在已经修复了 JDK 中的一个错误。
【解决方案2】:

考虑到相关场景

  • 使用skip设置流源
  • 客户端代码正在请求parallel()执行
  • 客户端代码正在链接一个无序的终端操作,例如forEach
  • 代码在早于1.8u60 的 JRE 上运行

我们有相当特殊的情况组合,所有情况都不受将链接.map(r -> createRecord(r)) 操作的特定库函数的控制。

我认为责任不在这一点上。好吧,一般来说,应用程序代码不负责修复已被识别为 JRE 错误并在最新版本中修复的内容。

如果出于某种原因您认为有必要为较旧的 JRE 提供解决方法,则由需要 skip 操作的流源来执行此操作。

对于这种特殊情况,这并不难。您可以create the BufferedReader directly,调用readLine() 跳过第一行,然后返回lines() 的结果,这允许处理所有剩余的行。作为带有skip 操作的并行流,这可能会更有效。

更通用的解决方案是这样的“急切先跳过”操作:

public static <T> Stream<T> skipFirstImmediately(Stream<T> source) {
    Spliterator<T> sp=source.spliterator();
    sp.tryAdvance(skipped -> {});
    return StreamSupport.stream(sp, source.isParallel());
}

请注意,在使用此方法时,由于当前 Stream 实现的属性,如果需要并行执行,在调用此方法之前将源 Stream 转换为并行而不是将生成的 Stream 转换为并行可能是有益的。

这可以通过比较输出来验证

skipFirstImmediately(IntStream.range(0, 10).parallel().boxed())
    .peek(x -> System.out.println(Thread.currentThread()))
    .forEach(System.out::println);

skipFirstImmediately(IntStream.range(0, 10).boxed()).parallel()
    .peek(x -> System.out.println(Thread.currentThread()))
    .forEach(System.out::println);

这在任何一种情况下都是正确的,但不能在后者中利用 SMP 功能。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-01-29
    • 2023-03-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多