【发布时间】:2016-12-22 15:40:52
【问题描述】:
我正面临 Spark 的一种奇怪行为。这是我的代码:
object MyJob {
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
val sqlContext = new hive.HiveContext(sc)
val query = "<Some Hive Query>"
val rawData = sqlContext.sql(query).cache()
val aggregatedData = rawData.groupBy("group_key")
.agg(
max("col1").as("max"),
min("col2").as("min")
)
val redisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))
aggregatedData.foreachPartition {
rows =>
writePartitionToRedis(rows, redisConfig)
}
aggregatedData.write.parquet(s"/data/output.parquet")
}
}
与我的直觉相反,Spark 调度程序为每个数据接收器(Redis、HDFS/Parquet)生成两个作业。问题是第二个工作也是执行 hive 查询并使工作加倍。我假设两个写操作都会共享来自aggregatedData 阶段的数据。有什么问题还是预期的行为?
【问题讨论】:
标签: scala hadoop apache-spark