【发布时间】:2021-10-09 18:13:09
【问题描述】:
我正在尝试通过每 30 秒获取一次总和来汇总我的数据。我想知道这个聚合的结果是否为零,如果在那个 30s 区域中没有行,就会发生这种情况。
这是一个最小的工作示例,说明了我想要使用 pandas 的结果,以及它在 pyspark 中的不足之处。
输入数据
import pandas as pd
from pyspark.sql import functions as F
df = pd.DataFrame(
[
(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-10T15:27:29+00:00"),
(25, "2017-03-10T15:27:30+00:00"),
(101, "2017-03-10T15:29:00+00:00"),
(99, "2017-03-10T15:29:29+00:00")
],
columns=["dollars", "timestamp"],
)
df["timestamp"] = pd.to_datetime(df["timestamp"])
print(df)
dollars timestamp
0 17 2017-03-10 15:27:18+00:00
1 13 2017-03-10 15:27:29+00:00
2 25 2017-03-10 15:27:30+00:00
3 101 2017-03-10 15:29:00+00:00
4 99 2017-03-10 15:29:29+00:00
熊猫解决方案
使用 pandas,我们可以使用 resample 来聚合每 30 秒的窗口,然后在这些窗口上应用 sum 函数(注意 2017-03-10 15:28:00+00:00 和 2017-03-10 15:28:30+00:00 的结果):
desired_result = df.set_index("timestamp").resample("30S").sum()
desired_result
dollars
timestamp
2017-03-10 15:27:00+00:00 30
2017-03-10 15:27:30+00:00 25
2017-03-10 15:28:00+00:00 0
2017-03-10 15:28:30+00:00 0
2017-03-10 15:29:00+00:00 200
PySpark 接近解决方案
在 pyspark 中,我们可以使用 pyspark.sql.functions.window 每 30 秒进行一次窗口化(改编,感谢 this stack answer),但这会错过没有行的窗口:
spark: pyspark.sql.session.SparkSession # I expect you to have set up your session...
sdf = spark.createDataFrame(df)
sdf.groupby(
F.window("timestamp", windowDuration="30 seconds", slideDuration="30 seconds")
).agg(F.sum("dollars")).display()
window,sum(dollars)
"{""start"":""2017-03-10T15:27:30.000+0000"",""end"":""2017-03-10T15:28:00.000+0000""}",25
"{""start"":""2017-03-10T15:27:00.000+0000"",""end"":""2017-03-10T15:27:30.000+0000""}",30
"{""start"":""2017-03-10T15:29:00.000+0000"",""end"":""2017-03-10T15:29:30.000+0000""}",200
问题
如何让 pyspark 返回没有行的时间窗口的窗口结果(如 pandas)?
【问题讨论】:
标签: python pandas dataframe apache-spark pyspark