【问题标题】:For loop to select a column in ScalaFor循环在Scala中选择一列
【发布时间】:2020-01-23 00:19:45
【问题描述】:

我想在 scala 中为 spark 数据帧实现以下目标,

  1. 对于每一列,选择 colname 和 flag 变量(0 或 1)
  2. 在 flag = 0 和 flag = 1 时求列的平均值
  3. 列的标准开发

我不确定如何遍历列并在循环的每次迭代中选择每一列并标记变量。我尝试的是:-

 for (a <- colnames) {
      val dat1 = data.filter($"cust_flag".isin("1")).select(a)
      val dat0 = data.filter($"cust_flag".isin("0")).select(a)
      val m0 = dat1.select(avg(a)).asInstanceOf[Double]
      val m1 = dat0.select(avg(a)).asInstanceOf[Float]
      val stdev = data.agg(stddev(a)).asInstanceOf[Float]
      val rpb = ((m1 - m0) / stdev)*p*q
      println(rpb)

现在我收到一个错误 - 线程“main”java.lang.ClassCastException 中的异常:org.apache.spark.sql.Dataset 无法转换为 java.lang.Float

【问题讨论】:

  • 能否提供输入数据框和预期输出
  • 编辑问题以添加代码
  • 什么是 p 和 q ?并尽可能提供样本数据

标签: scala apache-spark apache-spark-sql


【解决方案1】:

要从字符串中创建具有给定名称的列,一种简单的方法是使用:

import org.apache.spark.sql.{functions => sf}

df.select(sf.col(colName))

您可以在您认为合适的情况下将其组合到控制逻辑(您的循环)中。

如果您想知道数据框中的列,请使用df.columns

【讨论】:

    【解决方案2】:

    我们有mean()stddev() 的直接函数

    创建两个过滤数据集

    即。

    1 代表标志 =0 和 2 代表标志 =1 和

    dfcol0= df.filter(df("colname") === "0")
    dfcol1= df.filter(df("colname") === "1")
    

    现在使用stddev()mean() 函数获取所需的内容。

     dfcol0.select(stddev("coname")).show(false)
     dfcol0.select(mean("coname")).show(false)
    

    【讨论】:

      【解决方案3】:

      据我了解,您正在尝试根据每列的标志和标准偏差来获取每列的平均值,而与标志无关。之后,您将应用公式并计算 rpb。

      在相同的逻辑基础上,我采取了样本数据并编写了没有循环的代码。它会比您使用的循环逻辑更快。 Spark 不擅长循环逻辑,因此请尝试将所有需要的数据放入一行(例如下面示例中的 avg0、avg1 和 StdDev),然后水平或批量处理。

      请注意,正如我在上面评论的那样。我不明白 p 的价值 和 q 所以我在最终的输出数据帧逻辑中忽略了它。你可以 如果这些是之前声明的变量,则直接添加。

      scala> import org.apache.spark.sql.types._
      scala> val df = Seq(
           |   ("121",  "442",  "512","1"),
           |   ("134",  "434",  "752","0"),
           |   ("423",  "312",  "124","1"),
           |     ("432",  "677",  "752","0"),
           |   ("332",  "424",  "111","1")).
           |   toDF("col1","col2","col3","cust_flag").
           |   withColumn("col1", $"col1".cast(DoubleType)).
           |   withColumn("col2", $"col2".cast(DoubleType)).
           |   withColumn("col3", $"col3".cast(DoubleType))
      
      scala> df.show
      +-----+-----+-----+---------+
      | col1| col2| col3|cust_flag|
      +-----+-----+-----+---------+
      |121.0|442.0|512.0|        1|
      |134.0|434.0|752.0|        0|
      |423.0|312.0|124.0|        1|
      |432.0|677.0|752.0|        0|
      |332.0|424.0|111.0|        1|
      +-----+-----+-----+---------+
      
      scala>val colSeq =  Seq("col1", "col2", "col3")
      
      
      scala>  val aggdf  = colSeq.map(c => {
           | df.groupBy("cust_flag").agg( lit(c).alias("columnName"), avg(c).cast("Decimal(14,2)").alias("avg"))
           | })
      
      
      scala> val devdf  = colSeq.map(c => {
           |   df.agg( lit(c).alias("columnName"), stddev(c).cast("Decimal(14,2)").alias("StdDev"))
           |   })
      
      scala> val avgDF = aggdf.reduce(_ union _)
      
      scala> val stdDevDF = devdf.reduce(_ union _)
      
      scala> val finalAvgDF = avgDF.filter(col("cust_flag") === 1).alias("1").join(avgDF.filter(col("cust_flag") === 0).alias("0"), List("columnName")).select(col("columnName"), col("1.avg").alias("avg1"), col("0.avg").alias("avg0"))
      
      scala> val outDF = finalAvgDF.join(stdDevDF, List("columnName"))
      
      scala> outDF.show()
      +----------+------+------+------+                                               
      |columnName|  avg1|  avg0|StdDev|
      +----------+------+------+------+
      |      col1|292.00|283.00|152.07|
      |      col2|392.67|555.50|133.48|
      |      col3|249.00|752.00|319.16|
      +----------+------+------+------+
      
      //apply your final formula to ger rpb
      scala> outDF.withColumn("rpb", (col("avg1") - col("avg0"))/col("StdDev")).show
      +----------+------+------+------+--------------------+                          
      |columnName|  avg1|  avg0|StdDev|                 rpb|
      +----------+------+------+------+--------------------+
      |      col1|292.00|283.00|152.07| 0.05918327086210298|
      |      col2|392.67|555.50|133.48|-1.21988312855858556|
      |      col3|249.00|752.00|319.16|-1.57601203158290513|
      +----------+------+------+------+--------------------+ 
      

      【讨论】:

        【解决方案4】:

        我建议你使用df.selectExpr(),它可以接受一个字符串序列:

        val expressions = Seq("avg(col1) as avg_col1","std_dev(col1) as sd_col1", "...")
        
        df.selectExpr(expressions:_*)
        

        您几乎可以使用该函数在 for 循环中构建表达式数组的所有操作。

        无论如何,我建议您向我们展示一个预期输入/输出的示例(您编写的代码并不能说明太多)。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2012-08-18
          • 1970-01-01
          • 2019-04-06
          • 2017-10-03
          • 1970-01-01
          • 2013-06-02
          • 2021-03-20
          • 1970-01-01
          相关资源
          最近更新 更多