使用stddev
stddev 仅在数字列上定义。如果您想计算 date 列的标准差,您需要先将其转换为时间戳:
scala> var myDF = (0 to 10).map(x => (x, scala.util.Random.nextDouble)).toDF("id", "rand_double")
myDF: org.apache.spark.sql.DataFrame = [id: int, rand_double: double]
scala> myDF = myDF.withColumn("Date", current_date())
myDF: org.apache.spark.sql.DataFrame = [id: int, rand_double: double ... 1 more field]
scala> myDF.printSchema
root
|-- id: integer (nullable = false)
|-- rand_double: double (nullable = false)
|-- Date: date (nullable = false)
scala> myDF.show
+---+-------------------+----------+
| id| rand_double| Date|
+---+-------------------+----------+
| 0| 0.3786008989478248|2017-03-21|
| 1| 0.5968932024004612|2017-03-21|
| 2|0.05912760417456575|2017-03-21|
| 3|0.29974600653895667|2017-03-21|
| 4| 0.8448407414817856|2017-03-21|
| 5| 0.2049495659443249|2017-03-21|
| 6| 0.4184846380144779|2017-03-21|
| 7|0.21400484330739022|2017-03-21|
| 8| 0.9558142791013501|2017-03-21|
| 9|0.32530639391058036|2017-03-21|
| 10| 0.5100585655062743|2017-03-21|
+---+-------------------+----------+
scala> myDF = myDF.withColumn("Date", unix_timestamp($"Date"))
myDF: org.apache.spark.sql.DataFrame = [id: int, rand_double: double ... 1 more field]
scala> myDF.printSchema
root
|-- id: integer (nullable = false)
|-- rand_double: double (nullable = false)
|-- Date: long (nullable = true)
scala> myDF.show
+---+-------------------+----------+
| id| rand_double| Date|
+---+-------------------+----------+
| 0| 0.3786008989478248|1490072400|
| 1| 0.5968932024004612|1490072400|
| 2|0.05912760417456575|1490072400|
| 3|0.29974600653895667|1490072400|
| 4| 0.8448407414817856|1490072400|
| 5| 0.2049495659443249|1490072400|
| 6| 0.4184846380144779|1490072400|
| 7|0.21400484330739022|1490072400|
| 8| 0.9558142791013501|1490072400|
| 9|0.32530639391058036|1490072400|
| 10| 0.5100585655062743|1490072400|
+---+-------------------+----------+
此时所有列都是数字,因此您的代码将运行良好:
scala> :pa
// Entering paste mode (ctrl-D to finish)
val aggs = myDF.columns.map(p=>stddev(s"`${p}`").as(p))
val stddevs = myDF.select(aggs: _*)
val columnsToKeep: Seq[Column] = stddevs.first // Take first row
.toSeq // convert to Seq[Any]
.zip(myDF.columns) // zip with column names
.collect {
// keep only names where stddev != 0
case (s: Double, c) if s != 0.0 => col(c)
}
myDF.select(columnsToKeep: _*).show(false)
// Exiting paste mode, now interpreting.
+---+-------------------+
|id |rand_double |
+---+-------------------+
|0 |0.3786008989478248 |
|1 |0.5968932024004612 |
|2 |0.05912760417456575|
|3 |0.29974600653895667|
|4 |0.8448407414817856 |
|5 |0.2049495659443249 |
|6 |0.4184846380144779 |
|7 |0.21400484330739022|
|8 |0.9558142791013501 |
|9 |0.32530639391058036|
|10 |0.5100585655062743 |
+---+-------------------+
aggs: Array[org.apache.spark.sql.Column] = Array(stddev_samp(id) AS `id`, stddev_samp(rand_double) AS `rand_double`, stddev_samp(Date) AS `Date`)
stddevs: org.apache.spark.sql.DataFrame = [id: double, rand_double: double ... 1 more field]
columnsToKeep: Seq[org.apache.spark.sql.Column] = ArrayBuffer(id, rand_double)
使用countDistinct
话虽如此,使用countDistinct 会更通用:
scala> val distCounts = myDF.select(myDF.columns.map(c => countDistinct(c) as c): _*).first.toSeq.zip(myDF.columns)
distCounts: Seq[(Any, String)] = ArrayBuffer((11,id), (11,rand_double), (1,Date))]
scala> distCounts.foldLeft(myDF)((accDF, dc_col) => if (dc_col._1 == 1) accDF.drop(dc_col._2) else accDF).show
+---+-------------------+
| id| rand_double|
+---+-------------------+
| 0| 0.3786008989478248|
| 1| 0.5968932024004612|
| 2|0.05912760417456575|
| 3|0.29974600653895667|
| 4| 0.8448407414817856|
| 5| 0.2049495659443249|
| 6| 0.4184846380144779|
| 7|0.21400484330739022|
| 8| 0.9558142791013501|
| 9|0.32530639391058036|
| 10| 0.5100585655062743|
+---+-------------------+