【发布时间】:2020-01-09 20:18:11
【问题描述】:
我有以下 PySpark DataFrame df:
itemid eventid timestamp timestamp_end n
134 30 2016-07-02 2016-07-09 2
134 32 2016-07-03 2016-07-10 2
125 32 2016-07-10 2016-07-17 1
我想把这个DataFrame转换成下面这样的:
itemid eventid timestamp_start timestamp timestamp_end
134 30 2016-07-02 2016-07-02 2016-07-09
134 32 2016-07-02 2016-07-03 2016-07-09
134 30 2016-07-03 2016-07-02 2016-07-10
134 32 2016-07-03 2016-07-03 2016-07-10
125 32 2016-07-10 2016-07-10 2016-07-17
基本上,对于itemid 的每个唯一值,我需要将timestamp 放入一个新列timestamp_start。因此,itemid 组中的每一行都应该重复n 次,其中n 是组中的记录数。希望我解释清楚。
这是我在 PySpark 中的初始 DataFrame:
from pyspark.sql.functions import col, expr
df = (
sc.parallelize([
(134, 30, "2016-07-02", "2016-07-09"), (134, 32, "2016-07-03", "2016-07-10"),
(125, 32, "2016-07-10", "2016-07-17"),
]).toDF(["itemid", "eventid", "timestamp", "timestamp_end"])
.withColumn("timestamp", col("timestamp").cast("timestamp"))
.withColumn("timestamp_end", col("timestamp_end").cast("timestamp_end"))
)
到目前为止,我设法复制了行 n 次:
new_df = df.withColumn("n", expr("explode(array_repeat(n,int(n)))"))
但是如何创建timestamp_start,如上例所示?
谢谢。
【问题讨论】:
标签: python pyspark pyspark-dataframes