【问题标题】:Java 8 Streams - Timeout?Java 8 流 - 超时?
【发布时间】:2017-08-01 08:11:13
【问题描述】:

我想遍历一个巨大的数组并执行一组需要很长时间的复杂指令。但是,如果超过 30 秒,我希望它放弃。

例如

final long start = System.currentTimeMillis();
myDataStructure.stream()
    .while(() -> System.currentTimeMillis() <= start + 30000)
    .forEach(e ->
    {
      ...
    });

如果满足特定条件,我想避免在 forEach 调用中只说 return

【问题讨论】:

  • stackoverflow.com/questions/41392286/… 这是一个示例答案,也许对你有帮助。
  • 如果您正在计算副作用的变化并在之后应用它们(假设计算比应用昂贵得多),您可以使用Collector 的特殊实现,当超时。
  • 此外,Java 9 Streams 将有一个新的 takeWhile 方法,这可能正是您所要求的。
  • 问题...复杂的指令是 IO-bound 还是 CPU-bound?

标签: java java-8 java-stream


【解决方案1】:

我会为此创建一个自定义池,例如:

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
    try {
        forkJoinPool.submit(() ->
        IntStream.range(1, 1_000_000).filter(x -> x > 2).boxed().collect(Collectors.toList()))
                .get(30, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        // job not done in your interval
    }

【讨论】:

  • 据我了解,这应该在超时时提前返回,但是 fork 连接池如何停止处理以不消耗不必要的 CPU 资源?在我看来,它将继续在后台处理流。另外,您的代码中的 OP 操作在哪里?
  • @SpaceTrucker 你可以在超时时显式调用shutDownshutDownNow。关于代码 - 这只是一个例子......
  • 我认为您的解决方案是最好的解决方案。 +1
  • @SpaceTrucker 或者,来自ForkJoinPoolsubmit 方法返回一个Future,可以通过执行future.cancel(true) 来取消它,这将中断提交的任务,如果编写得当,可以以这种方式停止。
【解决方案2】:

如果与实际执行操作相比,在这种情况下迭代流或数组的成本更低,那么只需使用谓词并过滤时间是否结束。

final long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(30L);
myDataStructure.stream()
    .filter(e -> System.nanoTime() <= end)
    .forEach(e ->
    {
      ...
    });

问题是您是否需要知道哪些元素已被处理。使用上述方法,您必须检查之后是否对特定元素产生了副作用。

【讨论】:

  • 哦,我没想到会这样使用filter
  • 好主意,但如果流很大,在 30 秒后为每个元素评估 System.currentTimeMillis() &lt;= start + 30000L 可能会很浪费。
  • @Eran 是的,看起来像带有get 的自定义池会更合适
  • @Eran 那么也许是takeWhile() 而不是filter()?但是真正使用这样一个不纯的谓词并不是流API的风格,我认为普通的for循环会更好。
【解决方案3】:

由于 Stream forEach 没有 break,我认为您可以为此创建 Custom Exceptionbreak 循环:

myDataStructure.stream()
    .forEach(e ->
    {
      if (System.currentTimeMillis() <= start + 30000) {
          throw new MyTimeOutException()
      }
    });

您可以为 catch this 捕获此 Exception

【讨论】:

  • @Hatefiend 该异常仅在 30 秒后抛出一次。我认为创建和抛出异常的开销可以忽略不计。
  • 你打错了,应该是30000而不是30,因为它以毫秒为单位
  • 嗨@Hatefiend,这篇文章指出Stream 不能在没有catch重新抛出的情况下抛出checked exception。对于上面的代码,它会在代码块中抛出自定义Exception。它应该可以工作。
  • 这会破坏并行流
  • @SpaceTrucker:确实,30 秒内出现一个异常并没有什么坏处。此外,MyTimeOutException 可以使用a super constructor that disables stack traces 来避免异常中最昂贵的部分。
【解决方案4】:

您可以使用.allMatch() 是一个短路运算符来终止流:

final long start = System.currentTimeMillis();
myDataStructure.stream()
    .allMatch(e ->
    {
      // your task here
        return System.currentTimeMillis() <= start + 30000;
    });

【讨论】:

  • 这会起作用,但allMatch 会终止流。我目前正在使用anyMatch,两者不能很好地混合在一起。我也不能用anyMatch 短路。
  • @Hatefiend 是问题中描述的问题的解决方案。如果您有现在描述的限制/边界条件/其他要求,最好在问题/问题陈述中提及这些内容
【解决方案5】:

正如 cmets 在 OP 下所说,Java 8 中缺少 takeWhile/dropWhile(将在 Java 9 中添加)。没有任何理由尝试通过异常或其他代码来实现逻辑,因为代码看起来如此丑陋和完全没有场景,即使它只是为了练习。我认为使用 3rd 方库是一个更好的解决方案,例如 StreamEx

StreamEx(source).takeWhile(() -> System.currentTimeMillis() <= start + 30000)
                .forEach(e -> { ... });

【讨论】:

    猜你喜欢
    • 2017-05-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-18
    • 2014-07-23
    • 2016-12-06
    相关资源
    最近更新 更多