【问题标题】:Summing values across each row as boolean (PySpark)将每行的值求和为布尔值(PySpark)
【发布时间】:2020-04-28 05:05:40
【问题描述】:

我目前有一个 PySpark 数据框,其中包含许多由整数计数填充的列。其中许多列的计数为零。 我想找到一种方法来计算计数大于零的列数

换句话说,我想要一种对一行中的值求和的方法,其中给定行的所有列都是有效的布尔值(尽管可能不需要数据类型转换)。我的表中有几列是日期时间或字符串,所以理想情况下我会有一种首先选择数字列的方法。

当前数据框示例和所需输出

+---+---------- +----------+------------            
|USER|   DATE   |COUNT_COL1| COUNT_COL2|...     DESIRED COLUMN
+---+---------- +----------+------------ 
| b | 7/1/2019 |  12      |     1     |              2        (2 columns are non-zero)
| a | 6/9/2019 |  0       |     5     |              1
| c | 1/1/2019 |  0       |     0     |              0

Pandas:例如,在 pandas 中,这可以通过选择数字列、转换为 bool 并与 axis=1 求和来完成。我正在寻找一个 PySpark 等价物。

test_cols=list(pandas_df.select_dtypes(include=[np.number]).columns.values)
pandas_df[test_cols].astype(bool).sum(axis=1)

【问题讨论】:

  • 尝试类似:df.withColumn('cnt', sum((df[c[0]]>0).astype("int") for c in df.dtypes if c[1] in ['int','long'])).show()。您可以扩展列表以包括 double、float 等。

标签: python apache-spark pyspark


【解决方案1】:

对于数字,您可以通过使用 integer values 为所有列创建一个 array(使用 >df.dtypes),然后使用 higher order functions。在这种情况下,我使用 filter 去除所有的 0,然后使用 size 来获取每行所有非零元素的数量。(spark2.4+)

from pyspark.sql import functions as F
df.withColumn("arr", F.array(*[F.col(i[0]) for i in df.dtypes if i[1] in ['int','bigint']]))\
  .withColumn("DESIRED COLUMN", F.expr("""size(filter(arr,x->x!=0))""")).drop("arr").show()

#+----+--------+----------+----------+--------------+
#|USER|    DATE|COUNT_COL1|COUNT_COL2|DESIRED COLUMN|
#+----+--------+----------+----------+--------------+
#|   b|7/1/2019|        12|         1|             2|
#|   a|6/9/2019|         0|         5|             1|
#|   c|1/1/2019|         0|         0|             0|
#+----+--------+----------+----------+--------------+

【讨论】:

  • 效果很好,可以处理带有时间戳/日期的 spark 数据帧。谢谢
  • 很高兴帮助。感谢@jxc 提供使用 df.schema 的提示
【解决方案2】:

假设你有以下 df:

df.show()
df.printSchema()

+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|  a|  1|  2|  3|
|  a|  0|  2|  1|
|  a|  0|  0|  1|
|  a|  0|  0|  0|
+---+---+---+---+

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)

使用 case when 语句,您可以检查列是否为数字,然后检查它是否大于 0。在下一步中,f.size 将返回计数,这要归功于f.array_remove,它只留下了带有True 值的列。

from pyspark.sql import functions as f
cols = [f.when(f.length(f.regexp_replace(f.col(x), '\\d+', '')) > 0, False).otherwise(f.col(x).cast('int') > 0) for x in df2.columns]
df.select("*", f.size(f.array_remove(f.array(*cols), False)).alias("count")).show()

+---+---+---+---+-----+
|_c0|_c1|_c2|_c3|count|
+---+---+---+---+-----+
|  a|  1|  2|  3|    3|
|  a|  0|  2|  1|    2|
|  a|  0|  0|  1|    1|
|  a|  0|  0|  0|    0|
+---+---+---+---+-----+

【讨论】:

  • 这段代码好像时间戳列有问题,不应该相加。特别的例外是:org.apache.spark.sql.AnalysisException: cannot resolve '(t1.`date` > 0)' due to data type mismatch: differing types in '(t1.`date` > 0)' (timestamp and int).;;
  • 对,我已经进行了更正,它适用于日期和时间戳类型。如果您的数据框在没有带有所有 String 类型列的 inferSchema 的情况下加载,我的解决方案将很有用。
猜你喜欢
  • 1970-01-01
  • 2011-01-29
  • 2022-11-29
  • 1970-01-01
  • 2012-06-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-05
相关资源
最近更新 更多