我认为您需要的是长度为 300 天、周期为 1 天的滑动窗口。由于您要从 GCS 读取并且时间戳位于列中,因此您需要添加时间戳元数据,以便 Beam 知道元素何时“生成”,这是通过 WithTimestamps 完成的。
这里有一个示例,窗口长度为 10 天,周期为 1 天(所以它没有那么大):
seconds_in_day = 60 * 60 * 24
elements = [
{"temperature": 0, "timestamp": datetime(2020, 12, 1, 0, 0).timestamp()}, # 1st Dec at 0:00
{"temperature": 30, "timestamp": datetime(2020, 12, 2, 0, 0).timestamp()},
{"temperature": 10, "timestamp": datetime(2020, 12, 10, 0, 0).timestamp()},
{"temperature": 20, "timestamp": datetime(2020, 12, 23, 0, 0).timestamp()},
{"temperature": 5, "timestamp": datetime(2020, 12, 27, 0, 0).timestamp()},
{"temperature": 30, "timestamp": datetime(2020, 12, 31, 0, 0).timestamp()}, # 31th Dec at 0:00
]
def get_window(element, timestamp=beam.DoFn.TimestampParam):
return {"timestamp": datetime.fromtimestamp(int(timestamp) + 1), "avg_temperature_last_10days": element}
(p | beam.Create(elements)
| beam.Map(lambda x: beam.window.TimestampedValue(x["temperature"], x['timestamp'])) # Adds timestamp to element's metadata
| 'window' >> beam.WindowInto(window.SlidingWindows(10 * seconds_in_day, 1 * seconds_in_day)) # Window of 10 days, with a Period of 1 day
| beam.combiners.Mean.Globally().without_defaults()
| "get window value" >> beam.Map(get_window)
| beam.Map(print)
)
生成的输出应该是这样的:
[..]
{'timestamp': datetime.datetime(2020, 12, 10, 0, 0), 'avg_temperature_last_10days': 13.333333333333334}
{'timestamp': datetime.datetime(2020, 12, 9, 0, 0), 'avg_temperature_last_10days': 15.0}
{'timestamp': datetime.datetime(2020, 12, 8, 0, 0), 'avg_temperature_last_10days': 15.0}
[..]
{'timestamp': datetime.datetime(2020, 12, 27, 0, 0), 'avg_temperature_last_10days': 12.5}
{'timestamp': datetime.datetime(2020, 12, 26, 0, 0), 'avg_temperature_last_10days': 20.0}
{'timestamp': datetime.datetime(2020, 12, 25, 0, 0), 'avg_temperature_last_10days': 20.0}
{'timestamp': datetime.datetime(2020, 12, 24, 0, 0), 'avg_temperature_last_10days': 20.0}
{'timestamp': datetime.datetime(2020, 12, 23, 0, 0), 'avg_temperature_last_10days': 20.0}
{'timestamp': datetime.datetime(2021, 1, 5, 0, 0), 'avg_temperature_last_10days': 17.5}
{'timestamp': datetime.datetime(2021, 1, 4, 0, 0), 'avg_temperature_last_10days': 17.5}
[..]