【发布时间】:2020-02-20 03:26:45
【问题描述】:
这涉及一些复杂性,我可能不清楚一些基础知识。如下:
据我了解,spark 具有“转换”和“动作”。转换会懒惰地建立对您想要做的事情的描述,然后通过行动来实现它。这可以提高性能(允许优化计划),或者如果您在单个数据帧上使用多个操作,可能会导致重复工作,从而导致转换重复触发。为避免这种情况,.cache() 告诉 Spark 实际上“保存其工作”,因此您调用它的数据帧不应继续重新计算。
我的问题是它似乎没有这样做。我有一个函数“Foo”,它进行大量计算以产生一个(非常小的)数据帧。 Foo 跑得很快,我可以显示结果。我有另一个函数“Bar”,它对数据框执行一系列操作。 Bar 在(大的)原始输入上运行很快,但在 foo 的输出上运行非常慢,甚至缓存和合并。我还可以通过将 foo 的输出写入磁盘然后重新读取它来“强制”缓存,此时 bar 会快速运行:
display(bar(bigDF)) //Fast!
val profile = foo(bigDF).coalesce(1).cache()
display(profile) //Also fast! (and profile only has 2 rows, ~80 columns)
display(bar(profile)) //Slow!
profile
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("filename.csv")
val dumb = spark.read.format("csv").option("header", "true").load("filename.csv")
display(bar(dumb)) //Fast again
对我来说,这说明 .cache() 没有像我认为的那样工作 - 缓慢的调用会反复重新调用 foo 中的转换,除非我将它写入磁盘并强制它“忘记” “ 它的历史。有人可以解释我缺少什么吗?
【问题讨论】:
-
尝试重新分区您的数据框....您的数据框可能有少量分区或分区高度倾斜
标签: scala apache-spark databricks