【发布时间】:2017-09-15 02:30:32
【问题描述】:
我有两个数据框,第一个数据框 classRecord 有 10 个不同的条目,如下所示:
Class, Calculation
first, Average
Second, Sum
Third, Average
第二个数据框 studentRecord 有大约 50K 条目,如下所示:
Name, height, Camp, Class
Shae, 152, yellow, first
Joe, 140, yellow, first
Mike, 149, white, first
Anne, 142, red, first
Tim, 154, red, Second
Jake, 153, white, Second
Sherley, 153, white, Second
从第二个数据帧,基于类类型,我想分别基于营地计算高度(对于第一类:平均,对于第二类:总和等)(如果类是第一类,则黄色的平均值,白色等分开)。 我尝试了以下代码:
//function to calculate average
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame ) : Array[(String, Double)] = {
val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd
var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1 / y._2).collect
return avg_by_key
}
//required schema for further modifications
val schema = StructType(
StructField("name", StringType, false) ::
StructField("avg", DoubleType, false) :: Nil)
// for each loop on each class type
classRecord.rdd.foreach{
//filter students based on camps
var campYellow =studentRecord.filter($"Camp" === "yellow")
var campWhite =studentRecord.filter($"Camp" === "white")
var campRed =studentRecord.filter($"Camp" === "red")
// since I know that calculation for first class is average, so representing calculation only for class first
val avgcampYellow = averageOnName(campYellow)
val avgcampWhite = averageOnName(campWhite)
val avgcampRed = averageOnName(campRed)
// union of all
val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfYellow = sqlContext.createDataFrame(rddYellow, schema)
//union with yellow camp data
val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfWhite = sqlContext.createDataFrame(rddWhite, schema)
var dfYellWhite = dfYellow.union(dfWhite)
//union with yellow,white camp data
val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfRed = sqlContext.createDataFrame(rddRed, schema)
var dfYellWhiteRed = dfYellWhite .union(dfRed)
// other modifications and final result to hive
}
我在这里苦苦挣扎:
- 硬编码黄色、红色和白色,可能还有其他营地类型。
- 数据框目前正在过滤多次,有待改进。
- 我无法弄清楚如何根据类计算类型进行不同的计算(即根据类类型使用总和/平均值)。
感谢任何帮助。
【问题讨论】:
标签: scala apache-spark mapreduce apache-spark-sql