【问题标题】:delete constant columns spark having issue with timestamp column删除常量列引发时间戳列有问题
【发布时间】:2017-08-13 07:09:12
【问题描述】:

大家好,我做了这段代码,允许删除具有常量值的列。 我首先计算标准偏差,然后删除标准等于零的那些,但是当有一个具有时间戳类型的列时我遇到了这个问题该怎么办

由于数据类型不匹配,无法解析 'stddev_samp(time.1)':参数 1 需要双精度类型,但是,'time.1' 是时间戳类型。;;

val spark = SparkSession.builder.master("local").appName("my-spark-app").getOrCreate()
//val df = spark.range(1, 1000).withColumn("X2", lit(0)).toDF("X1","X2")
val df = spark.read.option("inferSchema", "true").option("header", "true").csv("C:/Users/mhattabi/Desktop/dataTestCsvFile/dataTest2.txt")
df.show(5)
//df.columns.map(p=>s"`${p}`")
//val aggs = df.columns.map(c => stddev(c).as(c)) 
val aggs = df.columns.map(p=>stddev(s"`${p}`").as(p))
val stddevs = df.select(aggs: _*)
val columnsToKeep: Seq[Column] = stddevs.first  // Take first row
  .toSeq  // convert to Seq[Any]
  .zip(df.columns)  // zip with column names
  .collect {
    // keep only names where stddev != 0
    case (s: Double, c) if s != 0.0  => col(c) 
  }
df.select(columnsToKeep: _*).show(5,false)

【问题讨论】:

    标签: scala apache-spark apache-spark-sql apache-spark-mllib


    【解决方案1】:

    使用stddev

    stddev 仅在数字列上定义。如果您想计算 date 列的标准差,您需要先将其转换为时间戳:

    scala> var myDF = (0 to 10).map(x => (x, scala.util.Random.nextDouble)).toDF("id", "rand_double")
    myDF: org.apache.spark.sql.DataFrame = [id: int, rand_double: double]
    
    scala> myDF = myDF.withColumn("Date", current_date())
    myDF: org.apache.spark.sql.DataFrame = [id: int, rand_double: double ... 1 more field]
    
    scala> myDF.printSchema
    root
     |-- id: integer (nullable = false)
     |-- rand_double: double (nullable = false)
     |-- Date: date (nullable = false)
    
    
    scala> myDF.show
    +---+-------------------+----------+
    | id|        rand_double|      Date|
    +---+-------------------+----------+
    |  0| 0.3786008989478248|2017-03-21|
    |  1| 0.5968932024004612|2017-03-21|
    |  2|0.05912760417456575|2017-03-21|
    |  3|0.29974600653895667|2017-03-21|
    |  4| 0.8448407414817856|2017-03-21|
    |  5| 0.2049495659443249|2017-03-21|
    |  6| 0.4184846380144779|2017-03-21|
    |  7|0.21400484330739022|2017-03-21|
    |  8| 0.9558142791013501|2017-03-21|
    |  9|0.32530639391058036|2017-03-21|
    | 10| 0.5100585655062743|2017-03-21|
    +---+-------------------+----------+
    
    scala> myDF = myDF.withColumn("Date", unix_timestamp($"Date"))
    myDF: org.apache.spark.sql.DataFrame = [id: int, rand_double: double ... 1 more field]
    
    scala> myDF.printSchema
    root
     |-- id: integer (nullable = false)
     |-- rand_double: double (nullable = false)
     |-- Date: long (nullable = true)
    
    
    scala> myDF.show
    +---+-------------------+----------+
    | id|        rand_double|      Date|
    +---+-------------------+----------+
    |  0| 0.3786008989478248|1490072400|
    |  1| 0.5968932024004612|1490072400|
    |  2|0.05912760417456575|1490072400|
    |  3|0.29974600653895667|1490072400|
    |  4| 0.8448407414817856|1490072400|
    |  5| 0.2049495659443249|1490072400|
    |  6| 0.4184846380144779|1490072400|
    |  7|0.21400484330739022|1490072400|
    |  8| 0.9558142791013501|1490072400|
    |  9|0.32530639391058036|1490072400|
    | 10| 0.5100585655062743|1490072400|
    +---+-------------------+----------+
    

    此时所有列都是数字,因此您的代码将运行良好:

    scala> :pa
    // Entering paste mode (ctrl-D to finish)
    
    val aggs = myDF.columns.map(p=>stddev(s"`${p}`").as(p))
    val stddevs = myDF.select(aggs: _*)
    val columnsToKeep: Seq[Column] = stddevs.first  // Take first row
      .toSeq  // convert to Seq[Any]
      .zip(myDF.columns)  // zip with column names
      .collect {
        // keep only names where stddev != 0
        case (s: Double, c) if s != 0.0  => col(c)
      }
    myDF.select(columnsToKeep: _*).show(false)
    
    // Exiting paste mode, now interpreting.
    
    +---+-------------------+
    |id |rand_double        |
    +---+-------------------+
    |0  |0.3786008989478248 |
    |1  |0.5968932024004612 |
    |2  |0.05912760417456575|
    |3  |0.29974600653895667|
    |4  |0.8448407414817856 |
    |5  |0.2049495659443249 |
    |6  |0.4184846380144779 |
    |7  |0.21400484330739022|
    |8  |0.9558142791013501 |
    |9  |0.32530639391058036|
    |10 |0.5100585655062743 |
    +---+-------------------+
    
    aggs: Array[org.apache.spark.sql.Column] = Array(stddev_samp(id) AS `id`, stddev_samp(rand_double) AS `rand_double`, stddev_samp(Date) AS `Date`)
    stddevs: org.apache.spark.sql.DataFrame = [id: double, rand_double: double ... 1 more field]
    columnsToKeep: Seq[org.apache.spark.sql.Column] = ArrayBuffer(id, rand_double)
    

    使用countDistinct

    话虽如此,使用countDistinct 会更通用:

    scala> val distCounts = myDF.select(myDF.columns.map(c => countDistinct(c) as c): _*).first.toSeq.zip(myDF.columns)
    distCounts: Seq[(Any, String)] = ArrayBuffer((11,id), (11,rand_double), (1,Date))]
    
    scala> distCounts.foldLeft(myDF)((accDF, dc_col) => if (dc_col._1 == 1) accDF.drop(dc_col._2) else accDF).show
    +---+-------------------+
    | id|        rand_double|
    +---+-------------------+
    |  0| 0.3786008989478248|
    |  1| 0.5968932024004612|
    |  2|0.05912760417456575|
    |  3|0.29974600653895667|
    |  4| 0.8448407414817856|
    |  5| 0.2049495659443249|
    |  6| 0.4184846380144779|
    |  7|0.21400484330739022|
    |  8| 0.9558142791013501|
    |  9|0.32530639391058036|
    | 10| 0.5100585655062743|
    +---+-------------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-01-20
      • 2013-11-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-03-19
      相关资源
      最近更新 更多