【问题标题】:pyspark - TypeError: count() takes exactly 1 argument (2 given)pyspark - TypeError: count() 只需要 1 个参数(给定 2 个)
【发布时间】:2020-12-25 14:50:31
【问题描述】:

我正在连接一堆列并计算它们。我不能指望别名吗?

df.select(F.col("_c21"),F.concat(F.col("id1"),F.lit("|"),F.col("id2"),F.lit("|"),F.col("id3"),F.lit("|"),F.col("id4").alias("ids")))
df.repartition(col("_c21"])).count("ids").over(Window.partitionBy("_c21"))

数据看起来像这样

+--------------------+--------------------------------------------+
|                _c21|concat(id1, |, id2, |, id3, |, id4 AS `ids`)|
+--------------------+--------------------------------------------+
|roBMSlo...|                                  US|WA|98115|Centu...|
|3Vzlfim...|                                  FR|56|56130|SFR.....|
|rgBdftS...|                                  CA|NB|E1A|Bell Ca...|

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    使用F.count,而不是数据框的count 方法(计算总行数)。也不需要重新分区,因为窗口无论如何都会进行分区。而且您还放错了别名的括号。

    import pyspark.sql.functions as F
    
    df1 = df.select(
        F.col("_c21"),
        F.concat(
            F.col("id1"),F.lit("|"),F.col("id2"),F.lit("|"),F.col("id3"),F.lit("|"),F.col("id4")
        ).alias("ids")    # misplaced close bracket here
    )
    
    df2 = df1.select(F.count("ids").over(Window.partitionBy("_c21")))
    
    # or if you want an additional column, use 
    df2 = df1.withColumn("count_id", F.count("ids").over(Window.partitionBy("_c21")))
    

    其实concat_ws更合适:

    df1 = df.select(
        F.col("_c21"),
        F.concat_ws(
            "|",
            F.col("id1"), F.col("id2"), F.col("id3"), F.col("id4")
        ).alias("ids")
    )
    

    【讨论】:

    • 无法解析 (_c21, count(ids) OVER (PARTITION BY _c21 unspecifiedframe$())) 中的列名“count”;
    • 很确定错误是因为计数函数没有应用别名。
    • @VK_217 抱歉,第一个代码 sn-p 的最后一行有错字 - 应该是 df1.select,而不是 df.select。你能再试一次吗?
    • 最终使用 group by df.repartition(F.col("_cid")).groupby("_c21","ids").count()。谢谢@mck,感谢您的建议
    • 干杯 @VK_217 ,下次当您在 SO 上发布问题时,最好包含一个示例数据框和一个预期的输出数据框 - 这将有助于我们更快地找到答案:)
    【解决方案2】:

    使用 spark-sql。它提高了可读性、可移植性并且易于调试。

    示例输入:

    df = spark.sql(""" with t1 (
     select  'roBMSlo' c1,   'US' c2,   'WA' c3,   '98115' c4,    'Centuy' c5    union all
     select  '3Vzlfim' c1,   'FR' c2,   '56' c3,   '56130' c4,   'SFR' c5    union all
     select  'rgBdftS' c1,   'CA'  c2,  'NB' c3,   'E1A' c4,   'Bell Ca'  c5    
      )  select   c1  _c21  ,   c2  id1   ,   c3 id2  ,   c4  id3   ,   c5 id4      from t1
    """)
    df.show(truncate=False)
    df.createOrReplaceTempView("df")
    
    +-------+---+---+-----+-------+
    |_c21   |id1|id2|id3  |id4    |
    +-------+---+---+-----+-------+
    |roBMSlo|US |WA |98115|Centuy |
    |3Vzlfim|FR |56 |56130|SFR    |
    |rgBdftS|CA |NB |E1A  |Bell Ca|
    +-------+---+---+-----+-------+
    
    spark.sql("""
    select _c21, ids, count(ids) over(partition by _c21) cw from (
    select _c21, concat(id1,id2,id3,id4) ids  from df )
    """).show()
    
    +-------+---------------+---+
    |   _c21|            ids| cw|
    +-------+---------------+---+
    |roBMSlo|USWA98115Centuy|  1|
    |3Vzlfim|   FR5656130SFR|  1|
    |rgBdftS| CANBE1ABell Ca|  1|
    +-------+---------------+---+
    

    如果你想用分隔符加入,

    spark.sql("""
    select _c21, ids, count(ids) over(partition by _c21) cw from (
    select _c21, concat_ws("|",id1,id2,id3,id4) ids  from df )
    """).show()
    

    【讨论】:

      猜你喜欢
      • 2018-08-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多