【问题标题】:How to apply groupby condition and get all the columns in the result?如何应用 groupby 条件并获取结果中的所有列?
【发布时间】:2018-07-10 14:35:20
【问题描述】:

我的数据框看起来像

+-------------------------+-----+
| Title| Status|Suite|ID  |Time |
+------+-------+-----+----+-----+
|KIM   | Passed|ABC  |123 |20   |
|KJT   | Passed|ABC  |123 |10   |
|ZXD   | Passed|CDF  |123 |15   |
|XCV   | Passed|GHY  |113 |36   |
|KJM   | Passed|RTH  |456 |45   |
|KIM   | Passed|ABC  |115 |47   |
|JY    | Passed|JHJK |8963|74   |
|KJH   | Passed|SNMP |256 |47   |
|KJH   | Passed|ABC  |123 |78   |
|LOK   | Passed|GHY  |456 |96   |
|LIM   | Passed|RTH  |113 |78   |
|MKN   | Passed|ABC  |115 |74   |
|KJM   | Passed|GHY  |8963|74   |
+------+-------+-----+----+-----+

可以使用

创建
df = sqlCtx.createDataFrame(
[
    ('KIM', 'Passed', 'ABC', '123',20),
    ('KJT', 'Passed', 'ABC', '123',10),
    ('ZXD', 'Passed', 'CDF', '123',15),
    ('XCV', 'Passed', 'GHY', '113',36),
    ('KJM', 'Passed', 'RTH', '456',45),
    ('KIM', 'Passed', 'ABC', '115',47),
    ('JY', 'Passed', 'JHJK', '8963',74),
    ('KJH', 'Passed', 'SNMP', '256',47),
    ('KJH', 'Passed', 'ABC', '123',78),
    ('LOK', 'Passed', 'GHY', '456',96),
    ('LIM', 'Passed', 'RTH', '113',78),
    ('MKN', 'Passed', 'ABC', '115',74),
    ('KJM', 'Passed', 'GHY', '8963',74),     
],('Title', 'Status', 'Suite', 'ID','Time')

)

我需要在 ID 上申请 group by,在时间上申请 aggregation,结果我还需要获取 Title、Status 和 Suite 以及 ID。

我的输出应该是这样的

+-------------------------+-----+
| Title| Status|Suite|  ID|Time |
+------+-------+-----+----+-----+
|KIM   | Passed|ABC  |123 |30.75|
|XCV   | Passed|GHY  |113 |57   |
|KJM   | Passed|RTH  |456 |70.5 | 
|KIM   | Passed|ABC  |115 |60.5 |
|JY    | Passed|JHJK |8963|74   |
|KJH   | Passed|SNMP |256 |47   |
+------+-------+-----+----+-----+

我已经尝试了下面的代码。但它只是在结果中给了我 ID 中的值

df.groupBy("ID").agg(mean("Time").alias("Time"))

【问题讨论】:

  • 我在 ID 上执行 group by,在 Runtime 上执行 mean。我还需要另一列以及 ID 和运行时。状态和标题的值可能会有所不同。我刚选了第一个有ID的

标签: apache-spark pyspark pyspark-sql


【解决方案1】:

通过修改后的预期输出,您可以使用first 获得任意值:

from pyspark.sql.functions import avg, first

df.groupBy("id").agg(
    first("Title"), first("Status"), first("Suite"), avg("Time")
).toDF("id", "Title", "Status", "Suite", "Time").show()
# +----+-----+------+-----+-----+
# |  id|Title|Status|Suite| Time|
# +----+-----+------+-----+-----+
# | 113|  XCV|Passed|  GHY| 57.0|
# | 256|  KJH|Passed| SNMP| 47.0|
# | 456|  KJM|Passed|  RTH| 70.5|
# | 115|  KIM|Passed|  ABC| 60.5|
# |8963|   JY|Passed| JHJK| 74.0|
# | 123|  KIM|Passed|  ABC|30.75|
# +----+-----+------+-----+-----+

原答案

看来你想drop_duplicates:

df.drop_duplicates(subset=["ID"]).show()
# +-----+------+-----+----+                                                       
# |Title|Status|Suite|  ID|
# +-----+------+-----+----+
# |  XCV|Passed|  GHY| 113|
# |  KJH|Passed| SNMP| 256|
# |  KJM|Passed|  RTH| 456|
# |  KIM|Passed|  ABC| 115|
# |   JY|Passed| JHJK|8963|
# |  KIM|Passed|  ABC| 123|
# +-----+------+-----+----+

如果要使用特定行请参考Find maximum row per group in Spark DataFrame

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-06-22
    • 2023-01-12
    • 2020-11-13
    • 2020-10-16
    • 2019-03-23
    • 2021-08-26
    • 2021-10-15
    • 1970-01-01
    相关资源
    最近更新 更多