【问题标题】:What is the most efficient way to force immediate caching in Spark?在 Spark 中强制立即缓存的最有效方法是什么?
【发布时间】:2020-06-04 04:56:16
【问题描述】:

出于计时目的,我需要在执行函数之前强制缓存。 我最初的方法是使用 count() 操作,因为这将在所有分区中缓存 RDD,这与 take() 不同,但是在计算、通信或时间方面是否有更有效的方法来强制它?

// Load data, partition and mark to be cached
val data = sc.textFile("input.txt").map(_.toInt)
val partitioner = new RangePartitioner(16, data)
val partitioned_data = data.partitionBy(partitioner).cache()

// Force cache with count or something more efficient
partitioned_data.count()

// Do something
something(partitioned_data)

【问题讨论】:

  • 不需要强制缓存,因为在调用动作时会自动触发缓存,在上述情况下计数。 Spark DAG 将根据转换和操作自动创建缓存数据的计划。
  • 如果你真的想强制,只需触发一个动作。 IE。将其写入文件。
  • @hagarwal 我知道缓存是惰性的,因此只有在调用动作时才会触发,我的问题是我可以调用的最便宜的动作会触发缓存,有什么比count() 更好?

标签: scala apache-spark caching rdd


【解决方案1】:

所有这些都取决于您要做什么。如果您注意到您的环境已接近其持久内存的限制,我建议您使用保存到本地、清除缓存、重新加载和重新缓存技术。但是,下面我已经对所有简单的函数进行了 ncatalogue,并针对 2M 的记录文件运行它们,以比较它们的相对运行时间。

讲台是这样的:

1st(三通):Take(1),Take(1000),First;时间:9秒

第四个:计数;时间:17秒

5th:收集;时间:21秒

Disclaimer-1:是的,我知道计数输了,但我宣布它是秘密赢家,因为它被授予了一堆任意的风格点,主要是因为我认为这个答案正在慢慢变成“反正是谁的线”。

Disclaimer-2:所有测试都能够使用默认的 Spark 内存配置运行,但 collect 除外,我需要将其设置为大约一个更高的系数,它提供了 21 秒的运行时间。

如果你想在家里尝试一下,这里是你可以运行的代码(播放陈词滥调的游戏音乐):

val inputDF = spark.read.format("").load("")

var arrayOfCommand : Array[String] = Array("")
var arrayOfTime : Array[Long] = Array("0".toLong)

inputDF.count

val inputDF2 = inputDF.selectExpr("*", "'Count Run' as CommandColumn").persist

val countStartTime = System.nanoTime()
inputDF2.count

val countEndTime = System.nanoTime()
val countRunTime = (countEndTime-countStartTime)/1000000000

arrayOfCommand = Array("Count")
arrayOfTime = Array(countRunTime)

spark.catalog.clearCache
val inputDF3 = inputDF.selectExpr("*", "'Take 1 Run' as CommandColumn").persist

val takeStartTime = System.nanoTime()
inputDF3.take(1)

val takeEndTime = System.nanoTime()
val takeRunTime = (takeEndTime-takeStartTime)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("Take(1)")
arrayOfTime = arrayOfTime ++ Array(takeRunTime)

spark.catalog.clearCache
val inputDF4 = inputDF.selectExpr("*", "'Take 1000 Run' as CommandColumn").persist

val takeStartTime2 = System.nanoTime()
inputDF4.take(1000)

val takeEndTime2 = System.nanoTime()
val takeRunTime2 = (takeEndTime2-takeStartTime2)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("Take(1000)")
arrayOfTime = arrayOfTime ++ Array(takeRunTime)

spark.catalog.clearCache
val inputDF5 = inputDF.selectExpr("*", "'Collect Run' as CommandColumn").persist

val collectStartTime = System.nanoTime()
inputDF5.collect

val collectEndTime = System.nanoTime()
val collectRunTime = (collectEndTime-collectStartTime)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("Collect")
arrayOfTime = arrayOfTime ++ Array(collectRunTime)


spark.catalog.clearCache
val inputDF6 = inputDF.selectExpr("*", "'First Run' as CommandColumn").persist

val firstStartTime = System.nanoTime()
inputDF6.first

val firstEndTime = System.nanoTime()
val firstRunTime = (firstEndTime-firstStartTime)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("First")
arrayOfTime = arrayOfTime ++ Array(firstRunTime)

【讨论】:

  • 感谢您进行此比较,我决定坚持使用count(),因为take() 只会缓存它获取的分区而不是所有分区
猜你喜欢
  • 2011-04-17
  • 1970-01-01
  • 2020-03-23
  • 1970-01-01
  • 1970-01-01
  • 2013-07-04
  • 1970-01-01
  • 1970-01-01
  • 2014-08-14
相关资源
最近更新 更多