【问题标题】:Split Spark dataframe and calculate average based on one column value拆分 Spark 数据帧并根据一列值计算平均值
【发布时间】:2017-09-15 02:30:32
【问题描述】:

我有两个数据框,第一个数据框 classRecord 有 10 个不同的条目,如下所示:

Class, Calculation
first, Average
Second, Sum
Third, Average

第二个数据框 studentRecord 有大约 50K 条目,如下所示:

Name, height, Camp, Class
Shae, 152, yellow, first
Joe, 140, yellow, first
Mike, 149, white, first
Anne, 142, red, first
Tim, 154, red, Second
Jake, 153, white, Second
Sherley, 153, white, Second

从第二个数据帧,基于类类型,我想分别基于营地计算高度(对于第一类:平均,对于第二类:总和等)(如果类是第一类,则黄色的平均值,白色等分开)。 我尝试了以下代码:

//function to calculate average
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame ) : Array[(String, Double)] = {
  val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd
  var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1 / y._2).collect
  return avg_by_key
}

//required schema for further modifications
val schema = StructType(
StructField("name", StringType, false) ::
StructField("avg", DoubleType, false) :: Nil)

// for each loop on each class type
classRecord.rdd.foreach{
  //filter students based on camps
  var campYellow =studentRecord.filter($"Camp" === "yellow")
  var campWhite =studentRecord.filter($"Camp" === "white")
  var campRed =studentRecord.filter($"Camp" === "red")

  // since I know that calculation for first class is average, so representing calculation only for class first
  val avgcampYellow  =  averageOnName(campYellow)
  val avgcampWhite   =  averageOnName(campWhite)
  val avgcampRed   =  averageOnName(campRed)

  // union of all
  val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
  //conversion of rdd to frame
  var dfYellow = sqlContext.createDataFrame(rddYellow, schema)
  //union with yellow camp data
  val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
  //conversion of rdd to frame
  var dfWhite = sqlContext.createDataFrame(rddWhite, schema)
  var dfYellWhite = dfYellow.union(dfWhite)
  //union with yellow,white camp data
  val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
  //conversion of rdd to frame
  var dfRed = sqlContext.createDataFrame(rddRed, schema)
  var dfYellWhiteRed = dfYellWhite .union(dfRed)
  // other modifications and final result to hive
}

我在这里苦苦挣扎:

  1. 硬编码黄色、红色和白色,可能还有其他营地类型。
  2. 数据框目前正在过滤多次,有待改进。
  3. 我无法弄清楚如何根据类计算类型进行不同的计算(即根据类类型使用总和/平均值)。

感谢任何帮助。

【问题讨论】:

    标签: scala apache-spark mapreduce apache-spark-sql


    【解决方案1】:

    您可以简单地对 Class/Camp 的所有组合进行平均和求和计算,然后分别解析 classRecord 数据帧并提取您需要的内容。您可以使用 groupBy() 方法在 spark 中轻松完成此操作并聚合值。

    使用您的示例数据框:

    val spark = SparkSession.builder.getOrCreate()
    import spark.implicits._
    
    studentRecord.show()
    
    +-------+------+------+------+
    |   Name|height|  Camp| Class|
    +-------+------+------+------+
    |   Shae|   152|yellow| first|
    |    Joe|   140|yellow| first|
    |   Mike|   149| white| first|
    |   Anne|   142|   red| first|
    |    Tim|   154|   red|Second|
    |   Jake|   153| white|Second|
    |Sherley|   153| white|Second|
    +-------+------+------+------+
    
    val df = studentRecord.groupBy("Class", "Camp")
      .agg(
        sum($"height").as("Sum"), 
        avg($"height").as("Average"), 
        collect_list($"Name").as("Names")
      )
    df.show()
    
    +------+------+---+-------+---------------+
    | Class|  Camp|Sum|Average|          Names|
    +------+------+---+-------+---------------+
    | first| white|149|  149.0|         [Mike]|
    | first|   red|142|  142.0|         [Anne]|
    |Second|   red|154|  154.0|          [Tim]|
    |Second| white|306|  153.0|[Jake, Sherley]|
    | first|yellow|292|  146.0|    [Shae, Joe]|
    +------+------+---+-------+---------------+
    

    完成此操作后,您可以简单地检查您的第一个 classRecord 数据框,之后您需要哪些行。它的外观示例,可以根据您的实际需要进行更改:

    // Collects the dataframe as an Array[(String, String)]
    val classRecs = classRecord.collect().map{case Row(clas: String, calc: String) => (clas, calc)}
    
    for (classRec <- classRecs){
      val clas = classRec._1
      val calc = classRec._2
    
      // Matches which calculation you want to do
      val df2 = calc match {
        case "Average" => df.filter($"Class" === clas).select("Class", "Camp", "Average")
        case "Sum" => df.filter($"Class" === clas).select("Class", "Camp", "Sum")
      }
    
    // Do something with df2
    }
    

    希望对你有帮助!

    【讨论】:

    • 部分是这样的,我需要所有的名字也属于案例,比如“类,营地,名字,平均”。即使我得到最终的DF。我将如何决定首先上课我需要选择平均值(丢弃总和),其次我需要总和(丢弃平均)等等。
    • 我也尝试了上述解决方案,它显示错误:值 groupby 不是 org.apache.spark.rdd.RDD[String] 的成员。谢谢。
    • @Swati 抱歉,应该是groupBy(),大写字母“B”。将名称列表也添加到解决方案中。
    • @shaido...我明白了。错误是我的,我没有将我的 rdd 转换为帧。我仍然在努力解决第二个问题,即在获得 DF 之后,我应该如何决定:首先我需要选择平均值(丢弃总和),其次我需要总和(丢弃平均)等等。
    • @Swati 添加了一些可以帮助您的代码
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-11-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多