【发布时间】:2020-07-16 10:15:56
【问题描述】:
Given:无限的数据流,其中一些重复事件位于每个事件附近(就时间而言);重复事件具有相同的事件时间戳和相同的唯一 ID
目标:从流中删除重复项
我计划应用一个固定大小的窗口,然后通过唯一键对给定事件进行分组并执行早期触发。所以我有以下内容:
(
element
| "Add timestamp to event" >> ParDo(AddTimestampCustomFn()) # simply return TimistampedValue
| "Select unique key" >> Map(lambda elem: (elem.id, elem))
| "Apply window" >> WindowInto(
FixedWindows(50),
trigger=AfterCount(1),
accumulation_mod=AccumulationMode.DISCARDING,
allowed_lateness=0,
)
| "Group events by id" >> GroupByKey()
| "Print results" >> ParDo(CustomPrintFn()) # simply print the first element from the grouped elements along with timestamp
)
一切似乎都按预期工作,但我意识到即使经过一段时间后,仍然可以处理属于应该已经过去的时间窗口的事件。
假设我们有以下具有相应时间戳的事件:[('a', 0), ('a', 1), ('a', 40), ('b', 20), ('c', 90), ('d', '140'), ('f', 1)]。我希望我的输出类似于:[('a', 49), ('b', 49), ('c': 99), ('d', 149)]。但是,除了上述输出之外,我还收到了 f 事件。所以实际输出是[('a', 49), ('b', 49), ('c': 99), ('d', 149), ('f', 49)]。值得注意的是,分组事件的时间戳等于时间窗口的最后一次。我不太明白为什么我会收到f 事件。窗口是固定的,它的长度是 50 秒,allowed_lateness 设置为 0。我还假设水印应该已经通过了。因此,我真的无法理解为什么事件f 仍然存在?
我还尝试执行没有 group by 语句的代码。但是,它似乎会产生类似的结果。让我们将以下数据作为输入:[('a', 1), ('b', 90), ('c', 140), ('f', 1)]。那么结果仍然包括事件f:[('a', 49), ('b', 99), ('c', 149), ('f', 49)]。这是简化的代码:
(
element
| "Add timestamp to event" >> ParDo(AddTimestampCustomFn())
| "Apply window" >> WindowInto(FixedWindows(50))
| "Print results" >> ParDo(CustomPrintFn())
)
我觉得我对固定窗口的理解是错误的,但我不明白为什么。
【问题讨论】:
标签: python google-cloud-dataflow apache-beam stream-processing windowing