【问题标题】:Should you always cache an RDD before an intermediate count?你应该总是在中间计数之前缓存一个 RDD 吗?
【发布时间】:2017-11-21 21:30:32
【问题描述】:

我想在开始和结束转换之间记录 RDD 中的行数。我的代码目前如下所示:

val transformation1 = firstTransformation(inputdata).cache  // Is this cache recommended or can I remove it?
log("Transformation1 count: " + tranformation1.count)
val tranformation2 = secondTransformation(transformation1).cache
val finalX = transformation2.filter(row => row.contains("x"))
val finalY = tranformation2.filter(row => row.contains("y"))

我的问题是 transformation1 是一个巨大的 RDD 并且占用了大量内存(它适合内存但稍后会导致内存问题)。但是,我知道,由于我在 transformation1(.count()secondTransformation())上执行 2 种不同的操作,因此通常建议将其缓存。

这种情况可能很常见,那么推荐的处理方法是什么?您应该始终在中间计数之前缓存 RDD,还是可以删除 transformation1 上的 .cache()

【问题讨论】:

    标签: scala apache-spark caching


    【解决方案1】:

    如果您遇到内存问题,应尽快取消持久化,也可以在磁盘上持久化。

    val transformation1 = firstTransformation(inputdata).persist(StorageLevel.DISK_ONLY)  // Is this cache recommended or can I remove it?
    log("Transformation1 count: " + tranformation1.count)
    val tranformation2 = secondTransformation(transformation1).persist(StorageLevel.DISK_ONLY)
    val finalX = transformation2.filter(row => row.contains("x"))
    val finalY = tranformation2.filter(row => row.contains("y"))
    // All the actions are done
    transformation1.unpersist()
    transformation2.unpersist()
    

    如果您可以在内存问题发生之前使用 unpersist,那么如果您缓存而不是持久存储在磁盘上会更好

    【讨论】:

    • 取消持久化时,Spark 崩溃并显示Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
    • 这是一个不同的问题,看看stackoverflow.com/questions/41123846/…
    猜你喜欢
    • 2012-03-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-12-29
    • 2010-09-13
    • 1970-01-01
    相关资源
    最近更新 更多