【问题标题】:Pivot String column on Pyspark DataframePyspark Dataframe 上的 Pivot String 列
【发布时间】:2016-09-25 23:54:09
【问题描述】:

我有一个像这样的简单数据框:

rdd = sc.parallelize(
    [
        (0, "A", 223,"201603", "PORT"), 
        (0, "A", 22,"201602", "PORT"), 
        (0, "A", 422,"201601", "DOCK"), 
        (1,"B", 3213,"201602", "DOCK"), 
        (1,"B", 3213,"201601", "PORT"), 
        (2,"C", 2321,"201601", "DOCK")
    ]
)
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

df_data.show()
 +---+----+----+------+----+
| id|type|cost|  date|ship|
+---+----+----+------+----+
|  0|   A| 223|201603|PORT|
|  0|   A|  22|201602|PORT|
|  0|   A| 422|201601|DOCK|
|  1|   B|3213|201602|DOCK|
|  1|   B|3213|201601|PORT|
|  2|   C|2321|201601|DOCK|
+---+----+----+------+----+

我需要按日期调整它:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show()

+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
|  2|   C|2321.0|  null|  null|
|  0|   A| 422.0|  22.0| 223.0|
|  1|   B|3213.0|3213.0|  null|
+---+----+------+------+------+

一切都按预期进行。但现在我需要旋转它并获得一个非数字列:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show()

当然我会得到一个例外:

AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;'

我想在行上生成一些东西

+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
|  2|   C|DOCK  |  null|  null|
|  0|   A| DOCK |  PORT| DOCK|
|  1|   B|DOCK  |PORT  |  null|
+---+----+------+------+------+

pivot 可以吗?

【问题讨论】:

    标签: python apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    假设 (id |type | date) 组合是唯一的,并且您的唯一目标是旋转而不是聚合,您可以使用 first(或任何其他不限于数值的函数):

    from pyspark.sql.functions import first
    
    (df_data
        .groupby(df_data.id, df_data.type)
        .pivot("date")
        .agg(first("ship"))
        .show())
    
    ## +---+----+------+------+------+
    ## | id|type|201601|201602|201603|
    ## +---+----+------+------+------+
    ## |  2|   C|  DOCK|  null|  null|
    ## |  0|   A|  DOCK|  PORT|  PORT|
    ## |  1|   B|  PORT|  DOCK|  null|
    ## +---+----+------+------+------+
    

    如果这些假设不正确,您必须预先汇总数据。比如最常见的ship值:

    from pyspark.sql.functions import max, struct
    
    (df_data
        .groupby("id", "type", "date", "ship")
        .count()
        .groupby("id", "type")
        .pivot("date")
        .agg(max(struct("count", "ship")))
        .show())
    
    ## +---+----+--------+--------+--------+
    ## | id|type|  201601|  201602|  201603|
    ## +---+----+--------+--------+--------+
    ## |  2|   C|[1,DOCK]|    null|    null|
    ## |  0|   A|[1,DOCK]|[1,PORT]|[1,PORT]|
    ## |  1|   B|[1,PORT]|[1,DOCK]|    null|
    ## +---+----+--------+--------+--------+
    

    【讨论】:

    • 另一种解决方案是collect_set 保留所有ship 值。
    • @Jacek,你能在这里给出解决方案吗
    • @stack0114106 将上面的max(struct 替换为collect_set,就完成了。寻找机会将其用作完整的答案。你知道有什么问题需要这样的答案吗? ;-)
    【解决方案2】:

    以防万一,如果有人正在寻找 SQL 风格的方法。

    rdd = spark.sparkContext.parallelize(
        [
            (0, "A", 223,"201603", "PORT"), 
            (0, "A", 22,"201602", "PORT"), 
            (0, "A", 422,"201601", "DOCK"), 
            (1,"B", 3213,"201602", "DOCK"), 
            (1,"B", 3213,"201601", "PORT"), 
            (2,"C", 2321,"201601", "DOCK")
        ]
    )
    df_data = spark.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])
    df_data.createOrReplaceTempView("df")
    df_data.show()
    
    dt_vals=spark.sql("select collect_set(date) from df").collect()[0][0]
    ['201601', '201602', '201603']
    
    dt_vals_colstr=",".join(["'" + c + "'" for c in sorted(dt_vals)])
    "'201601','201602','201603'"
    

    第 1 部分(注意 f 格式说明符)

    spark.sql(f"""
    select * from 
    (select id , type, date, ship from df)
    pivot (
    first(ship) for date in ({dt_vals_colstr})
    )
    """).show(100,truncate=False)
    
    +---+----+------+------+------+
    |id |type|201601|201602|201603|
    +---+----+------+------+------+
    |1  |B   |PORT  |DOCK  |null  |
    |2  |C   |DOCK  |null  |null  |
    |0  |A   |DOCK  |PORT  |PORT  |
    +---+----+------+------+------+
    

    第二部分

    spark.sql(f"""
    select * from 
    (select id , type, date, ship from df)
    pivot (
    case when count(*)=0 then null 
    else struct(count(*),first(ship)) end for date in ({dt_vals_colstr})
    )
    """).show(100,truncate=False)
    
    +---+----+---------+---------+---------+
    |id |type|201601   |201602   |201603   |
    +---+----+---------+---------+---------+
    |1  |B   |[1, PORT]|[1, DOCK]|null     |
    |2  |C   |[1, DOCK]|null     |null     |
    |0  |A   |[1, DOCK]|[1, PORT]|[1, PORT]|
    +---+----+---------+---------+---------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-05-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-09-03
      • 1970-01-01
      • 2020-10-16
      • 1970-01-01
      相关资源
      最近更新 更多