【问题标题】:How to create N duplicated rows in PySpark DataFrame?如何在 PySpark DataFrame 中创建和重复行?
【发布时间】:2020-01-09 20:18:11
【问题描述】:

我有以下 PySpark DataFrame df

itemid  eventid    timestamp     timestamp_end   n
134     30         2016-07-02    2016-07-09      2
134     32         2016-07-03    2016-07-10      2
125     32         2016-07-10    2016-07-17      1

我想把这个DataFrame转换成下面这样的:

itemid  eventid    timestamp_start   timestamp     timestamp_end
134     30         2016-07-02        2016-07-02    2016-07-09
134     32         2016-07-02        2016-07-03    2016-07-09
134     30         2016-07-03        2016-07-02    2016-07-10
134     32         2016-07-03        2016-07-03    2016-07-10
125     32         2016-07-10        2016-07-10    2016-07-17

基本上,对于itemid 的每个唯一值,我需要将timestamp 放入一个新列timestamp_start。因此,itemid 组中的每一行都应该重复n 次,其中n 是组中的记录数。希望我解释清楚。

这是我在 PySpark 中的初始 DataFrame:

from pyspark.sql.functions import col, expr

df = (
    sc.parallelize([
        (134, 30, "2016-07-02", "2016-07-09"), (134, 32, "2016-07-03", "2016-07-10"),
        (125, 32, "2016-07-10", "2016-07-17"),
    ]).toDF(["itemid", "eventid", "timestamp", "timestamp_end"])
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
    .withColumn("timestamp_end", col("timestamp_end").cast("timestamp_end"))
)

到目前为止,我设法复制了行 n 次:

new_df = df.withColumn("n", expr("explode(array_repeat(n,int(n)))"))

但是如何创建timestamp_start,如上例所示?

谢谢。

【问题讨论】:

    标签: python pyspark pyspark-dataframes


    【解决方案1】:

    IIUC,你可以使用Window函数collect_list来查找一个组中所有timestamp+timestamp_end的列表,然后使用SparkSQL内置函数inline/inline_outer来分解得到的structs数组:

    from pyspark.sql.functions import collect_list, expr
    from pyspark.sql import Window
    
    w1 = Window.partitionBy('itemid')
    
    df.withColumn('timestamp_range',  
        collect_list(expr("(timestamp as timestamp_start, timestamp_end)")).over(w1)
     ).selectExpr(
        'itemid',  
        'eventid', 
        'timestamp', 
        'inline_outer(timestamp_range)'
     ).show()    
    +------+-------+----------+---------------+-------------+
    |itemid|eventid| timestamp|timestamp_start|timestamp_end|
    +------+-------+----------+---------------+-------------+
    |   134|     30|2016-07-02|     2016-07-02|   2016-07-09|
    |   134|     30|2016-07-02|     2016-07-03|   2016-07-10|
    |   134|     32|2016-07-03|     2016-07-02|   2016-07-09|
    |   134|     32|2016-07-03|     2016-07-03|   2016-07-10|
    |   125|     32|2016-07-10|     2016-07-10|   2016-07-17|
    +------+-------+----------+---------------+-------------+
    

    其中: timestamp_range 是以下 named_struct 的 collect_list(在 SparkSQL 语法中):

    (timestamp as timestamp_start, timestamp_end)
    

    与以下相同:

    named_struct('timestamp_start', timestamp, 'timestamp_end', timestamp_end)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-12-03
      • 1970-01-01
      • 1970-01-01
      • 2018-07-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多