【问题标题】:How do I create a new column has the count of all the row values that are greater than 0 in pyspark?如何创建一个新列,其中包含 pyspark 中所有大于 0 的行值的计数?
【发布时间】:2022-02-02 19:27:45
【问题描述】:

假设我有一个 pyspark 数据框:

col1 col2  col3
1     2    -3
2     null  5
4     4     8
1     0     9

我想添加一个名为 check 的列,用于计算大于 0 的值的数量。

最终输出将是:

col1 col2  col3     check
    1     2    -3    2
    2     null  5    2
    4     4     8    3
    1     0     9    2

我正在尝试这个。但是,它没有帮助,错误如下:

df= df.withColumn("check", sum((df[col] > 0) for col in df.columns))

参数无效,不是字符串或列: 类型的 0x7f0a866ae580> 处。对于列文字, 使用“lit”、“array”、“struct”或“create_map”函数。

【问题讨论】:

    标签: dataframe apache-spark pyspark count apache-spark-sql


    【解决方案1】:

    不知道是否有更简单的基于 SQL 的解决方案,但使用 udf 非常简单。

    count_udf = udf(lambda arr: sum([1 for a in arr if a > 0]), IntegerType())
    df.withColumn('check', count_udf(array('col1', 'col2', 'col3'))).show()
    

    不确定它是否会处理空值。如果需要,在 udf 中添加空检查 (if a and a > 0)。

    想法:https://stackoverflow.com/a/42540401/496289


    您的代码显示您对非零列进行求和,而不是计数。如果你需要总和那么

    count_udf = udf(lambda arr: sum([a for a in arr if a > 0]), IntegerType())
    

    【讨论】:

    • 这不起作用。尝试显示时抛出错误:PythonException: 'TypeError: '>' not supported between 'NoneType' and 'int'', from , line 3. 完整的回溯如下:
    • @Messy, “不确定它是否会处理空值。如果需要,在 udf 中添加空值检查(如果 a 和 a > 0)。”
    【解决方案2】:

    创建一个新列arrayfilter新创建的列最后count列中的元素。

    Example:

    df.show(10,False)
    #+----+----+----+
    #|col1|col2|col3|
    #+----+----+----+
    #|1   |2   |-3  |
    #|2   |null|5   |
    #+----+----+----+
    
    df.withColumn("check",expr("size(filter(array(col1,col2), x -> x > 0))")).show(10,False)
    #+----+----+----+-----+
    #|col1|col2|col3|check|
    #+----+----+----+-----+
    #|1   |2   |-3  |2    |
    #|2   |null|5   |1    |
    #+----+----+----+-----+
    

    【讨论】:

      【解决方案3】:

      如果 > 0,您可以使用 functools.reduce 对来自 df.columns 的列列表求和,如下所示:

      from pyspark.sql import functions as F
      from operator import add
      from functools import reduce
      
      
      df = spark.createDataFrame([
          (1, 2, -3), (2, None, 5), (4, 4, 8), (1, 0, 9)
      ], ["col1", "col2", "col3"])
      
      df = df.withColumn(
          "check",
          reduce(add, [F.when(F.col(c) > 0, 1).otherwise(0) for c in df.columns])
      )
      
      df.show()
      #+----+----+----+-----+
      #|col1|col2|col3|check|
      #+----+----+----+-----+
      #|   1|   2|  -3|    2|
      #|   2|null|   5|    2|
      #|   4|   4|   8|    3|
      #|   1|   0|   9|    2|
      #+----+----+----+-----+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-09-02
        • 2022-10-13
        • 2022-09-23
        • 1970-01-01
        • 1970-01-01
        • 2021-01-22
        • 2018-08-13
        • 1970-01-01
        相关资源
        最近更新 更多