【问题标题】:Spark: Apply multiple transformations without recalculating or cachingSpark:应用多个转换而无需重新计算或缓存
【发布时间】:2017-10-27 23:49:03
【问题描述】:

是否可以获取转换(RDD/Dataframe)的输出并将其提供给两个独立的转换,而无需重新计算第一个转换并且不缓存整个数据集?

加长版

考虑案例。

我有一个非常大的数据集,无法放入内存。现在我对其进行了一些转换,以准备要有效处理的数据(分组、过滤、排序......):

DATASET --(TF1: transformation with group by, etc)--> DF1
DF1 --(TF2: more_transformations_some_columns)--> output
DF1 --(TF3: more_transformations_other_columns)--> output2

我想知道是否有任何方法(或在开发中计划)告诉 Spark,在 TF1 之后,它必须重用相同的结果(在分区级别,不缓存所有内容!)以同时为 TF2 和 TF3 服务。

这在概念上可以想象为每个分区的缓存(),当分区被进一步转换消耗时自动取消持久()。

我搜索了很长时间,但找不到任何方法。

我的尝试:

DF1 = spark.read()... .groupBy().agg()...
DF2 = DF1.select("col1").cache()  # col1 fits in mem
DF3 = DF1.select("col1", transformation(other_cols)).write()...  # Force evaluation of col1

不幸的是,DF3 无法猜测到 col1 的缓存。所以显然不可能要求 spark 只缓存几列。这已经可以缓解问题了。

有什么想法吗?

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    我认为不能只缓存一些列,
    但这能解决你的问题吗?

    DF1 = spark.read()... .groupBy().agg()...
    DF3 = DF1.select("col1", transformation(other_cols)).cache()
    DF3.write()
    DF2 = DF3.select("col1")
    

    【讨论】:

    • 您的解决方案在第一次转换后使用 cache() ,它不适合内存。没有缓存并使用DF2=spark.read(df3_file).select("col1") 会起作用,但目标是避免从磁盘读取。
    猜你喜欢
    • 2019-07-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-05
    • 1970-01-01
    • 2020-03-05
    • 2017-02-25
    • 1970-01-01
    相关资源
    最近更新 更多