【问题标题】:How to find average of a particular field in Scala如何在Scala中找到特定字段的平均值
【发布时间】:2017-12-01 20:11:35
【问题描述】:

我正在 Apache Spark 中的 Scala shell 中进行试验。 我有一个包含值列表的文本文件,我想找到特定列的平均值。我的 input.txt 文件如下所示。 (这不是整个文件,而是一个示例。)

1   12.4   12.5   18.9   19.9
2   1.7    1.9
3   11.99  1.9    8.9    12.90978933
2   89.987  7.99         12.898980800000
1   12.8    1.88  1.8
2   1.9     1.8   1.8979  1.808888

我想找到第 1 列中每列第 5 列的平均值。例如,假设这些是一组学生 ID 和标记。对于每个学生 ID,我想找到最后一门科目的分数。另请注意,最后一列中缺少某些值。

这是我迄今为止尝试过的代码。

val text = sc.textFile("/neerja/input.txt")
val data = text.flatMap(line => line.split("\\t")).map(word => (word,1).reduceByKey(_ + _);

我想获取最后一列并找到平均值。 作为第一步,我想获取最后一列中的所有值。

val fourth = text.map(_.split("\\t")(4)).collect

但这给了我ArrayIndexOutOfBoundException。我怀疑它的发生是因为最后一列中缺少一些值。请帮我找出最后一列的平均值。任何帮助将不胜感激。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您可以简单地执行以下操作

    val text = sc.textFile("/neerja/input.txt")
    
    val fourth = text.map(line => line.split("\\t"))
          .map(arr => Try(arr(4).toDouble) getOrElse(0.0)).mean()
    
    println(fourth)
    

    你应该得到第 5 列主题的平均值

    更新

    如果需要所有主题列的平均值,我建议您创建dataframeDataframes 已优化 RDD 并且许多内置函数可用于计算。

    要为给定的数据创建dataframe,您需要schema

    import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}
    val schema = StructType(Seq(
      StructField("Sn", IntegerType, true),
      StructField("subject1", DoubleType, true),
      StructField("subject2", DoubleType, true),
      StructField("subject3", DoubleType, true),
      StructField("subject4", DoubleType, true)
    ))
    

    RDD[Row] 需要创建为

    val data = text.map(line => line.split("\\t"))
      .map(arr => Row.fromSeq(Seq(arr(0).toInt, Try(arr(1).asInstanceOf[DoubleType]) getOrElse(0.0),Try(arr(2).toDouble) getOrElse(0.0),Try(arr(3).toDouble) getOrElse(0.0),Try(arr(4).toDouble) getOrElse(0.0))))
    

    最终创建数据框

    val df = sqlContext.createDataFrame(data, schema)
    

    每列的平均值可以通过mean函数as来计算

    df.select(mean("subject1").as("averageOFS1"),mean("subject2").as("averageOFS2"),mean("subject3").as("averageOFS3"),mean("subject4").as("averageOFS4")).show(false)
    

    应该给你dataframe

    +------------------+-----------------+-----------+-----------------+
    |averageOFS1       |averageOFS2      |averageOFS3|averageOFS4      |
    +------------------+-----------------+-----------+-----------------+
    |21.796166666666668|4.661666666666666|5.24965    |7.919609688333335|
    +------------------+-----------------+-----------+-----------------+
    

    【讨论】:

    • 非常感谢 Ramesh Maharjan。这当然帮助了我。但是我得到了这样的答案。 "scala> val 第四 = text.map(line => line.split("\\t")).map(arr => Try(arr(4).toDouble) getOrElse(0.0)).mean() 第四: Double = 0.5828306100675447 这是第 4 列的平均值。我真正想要的是“每个”主题的平均值。例如,我的预期输出是这样的。(每个主题,平均值)1, 19.9 2,7.3539344 3,12.90978933
    • 我的荣幸 :) @NeerjaSkyler,当你有资格时不要忘记投票
    • 当然,会的:)
    • getOrElse(0.0) 可能是个坏主意。相反,您应该在第一步中过滤掉空/非双精度值,然后计算平均值。
    • 是的,我同意@RickMoritz,更好的答案是使用 sqlContext,因为我已经回答了here
    【解决方案2】:

    如果您确实想尝试一种结构化的方法,您也可以使用 Dataframes 来实现:

    object average extends App{
    
      val sparkSession = SparkSession.builder
        .master("local")
        .appName("example")
        .getOrCreate()
    
      import sparkSession.implicits._
    
    
          val x = sparkSession.read
            .option("header", "false")
            .option("delimiter", "\\t")
            .option("mode", "FAILFAST")
            .csv("...Spark-2.x/src/main/resources/tab_data.csv")
    
    x.printSchema()
    x.show(truncate = false)
     val df: DataFrame =  x.select('_c0 as "id",
    '_c1 as "sub1",'_c2 as "sub2",'_c3 as "sub3",'_c4 as "sub4")
    
      df.groupBy('id).agg(avg('sub4)).show()
    }
    

    【讨论】:

      猜你喜欢
      • 2022-01-08
      • 1970-01-01
      • 1970-01-01
      • 2011-03-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多