【发布时间】:2018-10-01 18:13:58
【问题描述】:
我的 Spark 应用如下:
1) 使用 Spark SQL 对数据框“dataDF”执行大型查询
2)“dataDF”中涉及的foreach分区:
2.1) 获取关联的“过滤”数据帧,以便只有分区关联数据
2.2) 对“过滤”数据帧进行特定工作并写入输出
代码如下:
val dataSQL = spark.sql("SELECT ...")
val dataDF = dataSQL.repartition($"partition")
for {
row <- dataDF.dropDuplicates("partition").collect
} yield {
val partition_str : String = row.getAs[String](0)
val filtered = dataDF.filter($"partition" .equalTo( lit( partition_str ) ) )
// ... on each partition, do work depending on the partition, and write result on HDFS
// Example :
if( partition_str == "category_A" ){
// do group by, do pivot, do mean, ...
val x = filtered
.groupBy("column1","column2")
...
// write final DF
x.write.parquet("some/path")
} else if( partition_str == "category_B" ) {
// select specific field and apply calculation on it
val y = filtered.select(...)
// write final DF
x.write.parquet("some/path")
} else if ( ... ) {
// other kind of calculation
// write results
} else {
// other kind of calculation
// write results
}
}
这样的算法可以成功。 Spark SQL 查询是完全分布式的。然而,在每个结果分区上完成的特定工作是按顺序完成的,结果效率低下,尤其是因为与分区相关的每个写入都是按顺序完成的。
在这种情况下,有什么方法可以将“for yield”替换为并行/异步的东西?
谢谢
【问题讨论】:
-
您可以在
map或flatMap函数参数中对工作人员进行操作。 -
您好,正如我所说,我必须在生成的 spark sql 数据帧的每个分区上做特定的工作(应用特定的操作),从而导致不同的计算和输出模式取决于分区。我的目标是让不同的计算依赖于分区并行工作。我不明白如何用简单的地图做到这一点。你能详细说明一下吗?
-
确实,'for' 表示顺序的,因此不是分布式的。不知道你为什么选择那个 - 可能需要阐明这个逻辑是什么。
-
嘿。我添加了一些伪代码。我想要实现的是“只是”让 if 语句中的每个块并行运行。一种方式,每个最终逻辑应用于来自公共数据帧的每个特定分区并行运行。谢谢
-
有一些关于在大数据中使用 .par 的帖子。
标签: scala apache-spark