【发布时间】:2018-09-14 14:55:04
【问题描述】:
我不知道如何使用 pyspark 过滤列中的正值或负值,你能帮忙吗?
我有一个包含 10MM+ 行和 50+ 列的 spark 数据框,需要计算一个特定列中的值等于或小于 0 的次数。
提前致谢。
【问题讨论】:
标签: apache-spark pyspark apache-spark-sql
我不知道如何使用 pyspark 过滤列中的正值或负值,你能帮忙吗?
我有一个包含 10MM+ 行和 50+ 列的 spark 数据框,需要计算一个特定列中的值等于或小于 0 的次数。
提前致谢。
【问题讨论】:
标签: apache-spark pyspark apache-spark-sql
对于您要定位的列,您可以简单地过滤数据框,当值为 <= 0 时,并计算符合条件的行数。
import pyspark.sql.functions as func
df.filter(func.col("colname") <= 0).count()
【讨论】:
您可以使用以下解决方案通过 pyspark 过滤和计算 Spark 数据帧中的负值和正值:
df.filter(col("colname") <= 0).count() //or
df.filter("colname <= 0").count()
两者都应该工作。
【讨论】:
我不得不为一个大表(60m+ 记录,3000+ 列)做类似的事情,并且计算每列的计数太耗时了。 相反,我将每一行映射到 0 或 1;如果值为负,则为 1,否则为 0。 然后就是把这个转化后的Dataframe加起来,结果就代表了每一列值为负数的地方的个数。
这是 scala 中的示例
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(
Row(-4.0, 5.0, -2.0),
Row(4.0, -5.0, -2.0),
Row(-4.0, 5.0, -2.0))),
StructType(List(
StructField("col1", DoubleType, true),
StructField("col2", DoubleType, true),
StructField("col3", DoubleType, true)
))
)
val columns = df.columns
val transformedSchema = StructType(columns.map(col => StructField(col, LongType)))
val transformedDf = df.map(row => {
val transformed = columns.map(col =>
if (row.getDouble(row.fieldIndex(col)) < 0.0) 1L else 0L)
Row.fromSeq(transformed)
})(RowEncoder.apply(transformedSchema))
输出:
scala> df.show
+----+----+----+
|col1|col2|col3|
+----+----+----+
|-4.0| 5.0|-2.0|
| 4.0|-5.0|-2.0|
|-4.0| 5.0|-2.0|
+----+----+----+
scala> transformedDf.show
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 0| 1|
| 0| 1| 1|
| 1| 0| 1|
+----+----+----+
scala> transformedDf.groupBy().sum().show()
+---------+---------+---------+
|sum(col1)|sum(col2)|sum(col3)|
+---------+---------+---------+
| 2| 1| 3|
+---------+---------+---------+
【讨论】: