【问题标题】:Rolling aggregate for bounded datasets with Apache Beam使用 Apache Beam 滚动聚合有界数据集
【发布时间】:2020-12-24 16:24:17
【问题描述】:

我有一个有界数据集(例如,在 Google Cloud Storage 中)。数据有两列,时间戳和温度,不适合内存。您如何使用 Beam 计算滚动聚合(例如,300 天滚动平均温度)?我不确定使用窗口和边输入是否是有界数据集的最佳解决方案。

【问题讨论】:

  • 你检查过滑动窗口吗?它看起来有 300 天的长度和 1 天的期限

标签: google-cloud-dataflow apache-beam dataflow


【解决方案1】:

我认为您需要的是长度为 300 天、周期为 1 天的滑动窗口。由于您要从 GCS 读取并且时间戳位于列中,因此您需要添加时间戳元数据,以便 Beam 知道元素何时“生成”,这是通过 WithTimestamps 完成的。

这里有一个示例,窗口长度为 10 天,周期为 1 天(所以它没有那么大):

    seconds_in_day = 60 * 60 * 24

    elements = [
        {"temperature": 0, "timestamp": datetime(2020, 12, 1, 0, 0).timestamp()},  # 1st Dec at 0:00
        {"temperature": 30, "timestamp": datetime(2020, 12, 2, 0, 0).timestamp()},
        {"temperature": 10, "timestamp": datetime(2020, 12, 10, 0, 0).timestamp()},
        {"temperature": 20, "timestamp": datetime(2020, 12, 23, 0, 0).timestamp()},
        {"temperature": 5, "timestamp": datetime(2020, 12, 27, 0, 0).timestamp()},
        {"temperature": 30, "timestamp": datetime(2020, 12, 31, 0, 0).timestamp()},  # 31th Dec at 0:00
    ]


    def get_window(element,  timestamp=beam.DoFn.TimestampParam):
        return {"timestamp": datetime.fromtimestamp(int(timestamp) + 1), "avg_temperature_last_10days": element}

    (p | beam.Create(elements)
       | beam.Map(lambda x: beam.window.TimestampedValue(x["temperature"], x['timestamp']))  # Adds timestamp to element's  metadata
       | 'window' >> beam.WindowInto(window.SlidingWindows(10 * seconds_in_day, 1 * seconds_in_day))  # Window of 10 days, with a Period of 1 day
       | beam.combiners.Mean.Globally().without_defaults()
       | "get window value" >> beam.Map(get_window)
       | beam.Map(print)
    )

生成的输出应该是这样的:

[..]
{'timestamp': datetime.datetime(2020, 12, 10, 0, 0), 'avg_temperature_last_10days': 13.333333333333334}
{'timestamp': datetime.datetime(2020, 12, 9, 0, 0), 'avg_temperature_last_10days': 15.0}
{'timestamp': datetime.datetime(2020, 12, 8, 0, 0), 'avg_temperature_last_10days': 15.0}
[..]
{'timestamp': datetime.datetime(2020, 12, 27, 0, 0), 'avg_temperature_last_10days': 12.5}
{'timestamp': datetime.datetime(2020, 12, 26, 0, 0), 'avg_temperature_last_10days': 20.0}
{'timestamp': datetime.datetime(2020, 12, 25, 0, 0), 'avg_temperature_last_10days': 20.0}
{'timestamp': datetime.datetime(2020, 12, 24, 0, 0), 'avg_temperature_last_10days': 20.0}
{'timestamp': datetime.datetime(2020, 12, 23, 0, 0), 'avg_temperature_last_10days': 20.0}
{'timestamp': datetime.datetime(2021, 1, 5, 0, 0), 'avg_temperature_last_10days': 17.5}
{'timestamp': datetime.datetime(2021, 1, 4, 0, 0), 'avg_temperature_last_10days': 17.5}
[..]

【讨论】:

  • 当数据在同一天不定期有多个记录时会发生什么?使用小于 1 天的周期?小了多少?如问题中所述,我不确定使用滑动窗口是否是最通用的解决方案。
  • windows 所做的是根据时间戳对 PCollection 进行切片。滑动窗口的长度为 10 天,周期为 1 天,一个窗口从第 1 天到第 11 天,另一个窗口从第 2 天到第 12 天,另一个窗口从第 3 天到第 13 天,依此类推。如果您在第一天有两个元素,它们将落在同一个窗口中。请注意,在执行 Mean 时,具有相同窗口的所有元素的权重相同,如果您不希望这样做,您可以先应用 1 天的 FixedWindow,然后再应用 SlidingWindow。看起来你有一些具体的想法,请用示例更新 Q,我将修改代码
  • 如果您添加示例输入和输出,我将修改答案以或多或少匹配。我不确定我是否理解你提到的边缘情况
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-06-29
  • 2020-06-10
  • 2018-05-24
  • 1970-01-01
  • 2018-12-27
  • 1970-01-01
相关资源
最近更新 更多