【问题标题】:Spark is duplicating workSpark 正在重复工作
【发布时间】:2016-12-22 15:40:52
【问题描述】:

我正面临 Spark 的一种奇怪行为。这是我的代码:

object MyJob {
    def main(args: Array[String]): Unit = {
        val sc = new SparkContext()
        val sqlContext = new hive.HiveContext(sc)

        val query = "<Some Hive Query>"
        val rawData = sqlContext.sql(query).cache()

        val aggregatedData = rawData.groupBy("group_key")
           .agg(
               max("col1").as("max"),
               min("col2").as("min")
           )

        val redisConfig =  new RedisConfig(new RedisEndpoint(sc.getConf))
        aggregatedData.foreachPartition {
            rows =>
               writePartitionToRedis(rows, redisConfig)
        }

       aggregatedData.write.parquet(s"/data/output.parquet")
    }
}

与我的直觉相反,Spark 调度程序为每个数据接收器(Redis、HDFS/Parquet)生成两个作业。问题是第二个工作也是执行 hive 查询并使工作加倍。我假设两个写操作都会共享来自aggregatedData 阶段的数据。有什么问题还是预期的行为?

【问题讨论】:

    标签: scala hadoop apache-spark


    【解决方案1】:

    您错过了火花的一个基本概念:懒惰。

    RDD 不包含任何数据,它只是一组指令,当您调用某个操作(例如将数据写入磁盘/hdfs)时将执行这些指令。如果您重用 RDD(或 Dataframe),则没有存储数据,只需存储每次调用操作时都需要评估的指令。

    如果您想重用数据而不需要重新评估 RDD,请使用 .cache() 或最好使用 persist。持久化 RDD 允许您存储转换的结果,以便在未来的迭代中不需要重新评估 RDD。

    【讨论】:

    • 我正在缓存,但在 rawData 步骤上。我仍然在第二份工作中看到了这一步的一些任务。我如何确定第二个作业是从缓存中获取其阶段数据而不是执行不必要的工作?
    • 是的,您是缓存原始数据,但不是聚合数据,因此需要在每次迭代时重新评估。如果您进入 spark 应用程序 UI,您将看到有关您的应用程序正在执行的操作的所有数据。确保您缓存的数据适合内存,如果不尝试序列化或缓存到磁盘(使用persist 和适当的存储级别)。
    猜你喜欢
    • 1970-01-01
    • 2020-06-19
    • 2021-04-24
    • 1970-01-01
    • 1970-01-01
    • 2020-08-19
    • 2017-07-15
    • 2015-09-06
    • 2016-05-06
    相关资源
    最近更新 更多