【问题标题】:OutOfMemoryException with Stream ProcessingOutOfMemoryException 与流处理
【发布时间】:2014-03-21 01:20:30
【问题描述】:

我写了一个函数,给定一个Stream[Long],它将过滤掉> 4,000,000的数字。

  def filterLt4Mil(xs: Stream[Long]) = {
    @tailrec
    def go(xs: Stream[Long], acc: Stream[Long]): Stream[Long] = xs match {
      case Stream() => acc
      case a #:: as if(a < 4000000L) => go(as, acc :+ a)
      case a #:: as if(a > 4000000L) => go(as, acc)
    }
    go(xs, Stream[Long]())
  }

但是,当传入一个范围从 0 到 10,000,000 百万的 Stream 时,我得到一个 OutOfMemoryException

scala> val x = Stream.range(0,10000000L)
x: scala.collection.immutable.Stream[Long] = Stream(0, ?)

scala> filterLt4Mil(x)
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at scala.collection.immutable.List.toStream(List.scala:312)

由于filterLt4Mil 使用了尾调用优化,我的理解是堆栈不应该溢出。

但是,为什么会出现 OutOfMemoryException?如何预防?

【问题讨论】:

  • 如果省略 val x = 行会发生什么(即,将其写为 filterLt4Mil(Stream.range(0,10000000L))?

标签: scala stream


【解决方案1】:

你的函数有一些问题。

您的go 函数具有反转输出流中的元素的效果,因为您将追加到累加器的末尾。

此外,追加而不是前置的行为会导致整个累加器具体化为一个列表,然后在迭代的每个步骤中复制该列表。这就是导致 OutOfMemoryError 的原因,因为您的实现需要二次内存。

此外,您的 match 块并不详尽,因为您不处理 a == 4000000L 的情况。

如果您只想过滤掉大于 4000000 的元素,这就足够了:

xs filterNot {_ > 4000000L}

【讨论】:

  • 您能详细说明二次记忆吗?
  • 您在每次迭代时复制整个累加器,这意味着消耗的内存是流中元素的数量乘以累加器的大小,平均而言,它是流,这意味着您在内存中消耗了流大小平方的 1/2。这会定期进行垃圾收集,因为您在每次迭代后丢弃旧副本,但对于大型流,JVM 最终会在垃圾收集上花费太多时间并放弃。
【解决方案2】:

嗯,已经有一个filter 用于流的方法,但我猜你想自己做。

流有严格的头部和懒惰的尾部。调用Stream.range(0,10000000L) 只会在内存中保留0,因为其余部分将被延迟评估。

但是在您的filterLt4Mil 方法中,您遍历整个流,导致该流中的所有元素都被评估,从而导致从010000000L 的所有数字都存储在内存中... 存储这样的数量会导致OutOfMemoryException

例如:x.forall(_ &gt;= 0) 也会导致OutOfMemoryExceptionforall 也必须遍历整个流,同时满足_ &gt;= 0

你会希望你的 filter 方法被懒惰地应用。看看源代码。可能会让您了解它的工作原理: (http://harrah.github.io/browse/samples/library/scala/collection/immutable/Stream.scala.html)

override final def filter(p: A => Boolean): Stream[A] = {
  // optimization: drop leading prefix of elems for which f returns false
  var rest = this dropWhile (!p(_))
  if (rest.isEmpty) Stream.Empty
  else new Stream.Cons(rest.head, rest.tail filter p)
}

还有一个关于您的代码的问题。由于它必须遍历整个流,因此您可以猜测无限流会发生什么。

【讨论】:

    【解决方案3】:

    我认为你没有破坏堆栈 - 看起来你的堆用完了(确认,切换到使用 Int(s) 而不是 Long(s))。 scala 编译器应该知道如何对这个函数进行尾递归优化。

    10m 长是 64m。如果您要创建 64m 列表的 10m 实例,那么您就有麻烦了。

    此外,您可能希望拥有惰性流,而不仅仅是尾递归,以便此函数进行内存优化。 64m 本身并没有那么大,所以这个优化可能不需要。

    在我看来,您要么创建 10m 份 XS 参数副本,要么创建 ACC 副本。

    对我来说,xs: 参数似乎很懒惰;但是您要从函数外部对其进行引用,并且流是 scala 中的记忆器。

    所以我敢打赌那个参数是你得到数百万个副本的那个(即,scala 编译器不知道放弃中间值)。另一个参数应该在每次调用时收集垃圾(或者,因为我们正在谈论尾递归,所以在内存列表中优化到一个 64m 长)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-02-02
      • 2018-02-09
      • 2023-02-22
      • 1970-01-01
      • 2014-08-04
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多