【问题标题】:Spark sql- Do an aggregation based on another aggregation optionSpark sql-基于另一个聚合选项进行聚合
【发布时间】:2021-09-26 21:43:43
【问题描述】:

我在下面有这个数据框,我想要实现的是,只有当一行有一个“Y”标志时,code 列中的值将被聚合到一个列表。我已经尝试过 sql 打击,但没有奏效。这个怎么做?我在下面的代码和示例输出中进行了评论。非常感谢您的帮助。

输入:

name            code    flag
big bird          A       Y
elmo              B       N     
cookie monster    C       Y
cookie monster    D       N

预期输出:

name              hasYflag    Codelist
big bird            Y          A.   
elmo                N               //elmo does not have codelist as the flag is N
cookie monster      Y          C,D. //cookie monster has codelist as there is one Y (row 3 above) flag

我尝试过这样做,但它不起作用。我希望用 spark sql 而不是 Spark sql api 来做到这一点:

select name,
case when max(flag) = "Y" then "Y" else "N" end as hasYflag
case when max(flag) = "Y" then sort_array(collect_set(code)) else null as Codelist
from df
groupby name

【问题讨论】:

    标签: sql dataframe apache-spark apache-spark-sql dataset


    【解决方案1】:

    试试这个:

        import org.apache.spark.sql.expressions.Window
        import org.apache.spark.sql.functions._
    
        data
          .withColumn("code", collect_set("code") over Window.partitionBy("name"))
          .withColumn("rank", row_number() over Window.partitionBy("name").orderBy(col("flag").desc))
          .where(col("rank") === 1)
          .withColumn("code", when(col("flag") === lit("Y"), concat_ws(",", col("code"))))
          .withColumnRenamed("flag", "hasYFlag")
          .withColumnRenamed("code", "codeList")
          .select("name", "codeList", "hasYFlag")
    

    输出:

    +--------------+--------+--------+
    |          name|codeList|hasYFlag|
    +--------------+--------+--------+
    |cookie monster|     C,D|       Y|
    |      big bird|       A|       Y|
    |          elmo|    null|       N|
    +--------------+--------+--------+
    

    根据要求(编辑):

    sparkSession.sql("WITH dups AS (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY flag DESC) AS rn 
    FROM (
    SELECT name, flag, CONCAT_WS(',', COLLECT_SET(code) OVER (PARTITION BY name)) AS code FROM tmp_table)
    ) SELECT name, flag AS hasYFlag, IF(flag='Y', code, null) AS codeList FROM dups WHERE rn = 1")
    

    【讨论】:

    • 谢谢,但是有没有不使用 API 的方法,而是使用 sql 呢?
    • 通过 sql 我的意思是这样 spark.sql("....") 非常感谢
    • @user4046073 查看更新的答案。如果它完美地回答了您的问题,请将其评为正确答案:-)
    猜你喜欢
    • 1970-01-01
    • 2018-05-24
    • 1970-01-01
    • 2018-01-11
    • 1970-01-01
    • 2022-12-11
    • 2018-09-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多