【问题标题】:Dividing Scala iterators leads to GCoverhead/JavaHeapSpace problems划分 Scala 迭代器会导致 GCoverhead/JavaHeapSpace 问题
【发布时间】:2012-06-12 15:19:12
【问题描述】:

我正在使用 Scala 处理大数据,因此内存和时间对我来说比通常更重要。我试图通过在大型源文件上细分getLines 获得的初始Iterator[String] 来提高某些评估的速度,以便并行进行一些子评估并合并结果。我通过递归slice-ing 将迭代器分成两半并在每个子迭代器上调用递归函数来做到这一点。 现在,我想知道为什么我得到 GCoverhead 或 JavaHeapSpace 异常,虽然“关键”元素只在递归步骤之前评估一次(为了获得迭代器的大小),但在我看来不在递归步骤中,因为slice 再次返回一个迭代器(这在实现上是非严格的)。在连接子列表之前,以下(简化!)代码将无法应用于 ~15g 文件。

我在每个步骤中都使用.duplicate。我查看了 api,.duplicate 的文档说“实现可能为一个迭代器迭代但另一个迭代器尚未迭代的元素分配临时存储空间。”,但还没有元素被迭代。有人可以给我一个提示,那里出了什么问题以及如何解决这个问题?非常感谢!

type itType = Iterator[String]
def src = io.Source.fromFile(args(0)).getLines

// recursively divide into equal size blocks in divide&conquer fashion
def getSubItsDC(it: itType, depth: Int = 4) = {
    println("Getting length of file..")
    val totalSize = src.length
    println(totalSize)
    def rec(it_rec: itType = it, depth_rec: Int = depth, size: Int = totalSize): 
        List[itType] = depth_rec match {
            case n if n > 0 => 
                println(n)
                val (it1, it2) = it_rec.duplicate
                val newSize = size/2
                rec(it1 slice (0,newSize), n-1, newSize) ++ 
                    rec(it2 slice (newSize,size), n-1, newSize)
            case n if n == 0 => List(it_rec)
    }
    println("Starting recursion..")
    rec()
}
getSubItsDC(src)

在 REPL 中,代码运行速度与任意大小的迭代器一样快(当硬编码 totalSize 时),因此我假设了正确的惰性。

【问题讨论】:

    标签: scala iterator duplicates slice divide


    【解决方案1】:

    我认为您最好使用itr grouped size 获得Iterator[Iterator[String]]GroupedIterator):

    scala> val itr = (1 to 100000000).iterator grouped 1000000
    itr: Iterator[Int]#GroupedIterator[Int] = non-empty iterator
    

    这将允许您对文件的某些部分进行分块处理。

    为什么您的解决方案使用了太多内存

    复制Iterator显然是一种操作,这意味着迭代器可能必须缓存其计算值。例如:

    scala> val itr = (1 to 100000000).iterator
    itr: Iterator[Int] = non-empty iterator
    
    scala> itr filter (_ % 10000000 == 0) foreach println
    10000000
    ....
    100000000
    

    但是当我复制时:

    scala> val (a, b) = (1 to 100000000).iterator.duplicate
    a: Iterator[Int] = non-empty iterator
    b: Iterator[Int] = non-empty iterator
    
    scala> a filter (_ % 10000000 == 0) foreach println
    
    //oh dear, garbage collecting
    Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    

    在此示例中,当我运行 a 时,为了使 b 重复,a 已迭代的元素b 没有,需要被缓存

    【讨论】:

    • 感谢您的回复,非常感谢!我不太确定您的.duplicate 示例如何解释我的代码问题,因为我实际上从不遍历任何元素,是吗?我需要考虑使用.grouped。我觉得这完全不是我想要的,因为它返回某种列表迭代器。相比之下,我的递归方法旨在返回一个迭代器列表(因此反过来),因此我可以在该列表上调用 .par 而无需任何进一步的努力。先分组再调用.toList.par会导致同样的内存崩溃。有什么想法吗?
    • 好吧,您的解决方案不是尾递归的,而是使用Iterator.duplicate。我不确定为什么 Lists 的迭代器完全不是您想要的。也许这里有绝地思维技巧?
    • 我知道无尾递归,但这是问题所在吗?递归深度为 4,因此这将产生一个最大列表。尺寸 16(我希望我没有错)。我使用重复是,但我不评估任何元素,所以不需要缓存,为什么它仍然失败?您的示例也运行了一段时间,所以这一定意味着真的只有评估的元素被缓存,还是我错了?我不能使用 Iterator[List],因为我想通过 par 在 List[Iterator] 上进行线程处理,而不需要之前评估子迭代器。在 Iterator[List] 上调用 toList 会将所有数据加载到内存中。
    • 好吧,我可以试试myGroupedIterator.next.par,让我试试吧!对不起。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-03-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-11-22
    • 2020-12-21
    • 1970-01-01
    相关资源
    最近更新 更多