【问题标题】:Events being fired after end of the window when processing unbounded stream of data处理无限数据流时在窗口结束后触发的事件
【发布时间】:2020-07-16 10:15:56
【问题描述】:

Given:无限的数据流,其中一些重复事件位于每个事件附近(就时间而言);重复事件具有相同的事件时间戳和相同的唯一 ID
目标:从流中删除重复项

我计划应用一个固定大小的窗口,然后通过唯一键对给定事件进行分组并执行早期触发。所以我有以下内容:

(
    element
    | "Add timestamp to event" >> ParDo(AddTimestampCustomFn())    # simply return TimistampedValue
    | "Select unique key" >> Map(lambda elem: (elem.id, elem))
    | "Apply window" >> WindowInto(
        FixedWindows(50),
        trigger=AfterCount(1),
        accumulation_mod=AccumulationMode.DISCARDING,
        allowed_lateness=0,
    )
    | "Group events by id" >> GroupByKey()
    | "Print results" >> ParDo(CustomPrintFn())    # simply print the first element from the grouped elements along with timestamp
)

一切似乎都按预期工作,但我意识到即使经过一段时间后,仍然可以处理属于应该已经过去的时间窗口的事件。

假设我们有以下具有相应时间戳的事件:[('a', 0), ('a', 1), ('a', 40), ('b', 20), ('c', 90), ('d', '140'), ('f', 1)]。我希望我的输出类似于:[('a', 49), ('b', 49), ('c': 99), ('d', 149)]。但是,除了上述输出之外,我还收到了 f 事件。所以实际输出是[('a', 49), ('b', 49), ('c': 99), ('d', 149), ('f', 49)]。值得注意的是,分组事件的时间戳等于时间窗口的最后一次。我不太明白为什么我会收到f 事件。窗口是固定的,它的长度是 50 秒,allowed_lateness 设置为 0。我还假设水印应该已经通过了。因此,我真的无法理解为什么事件f 仍然存在?

我还尝试执行没有 group by 语句的代码。但是,它似乎会产生类似的结果。让我们将以下数据作为输入:[('a', 1), ('b', 90), ('c', 140), ('f', 1)]。那么结果仍然包括事件f:[('a', 49), ('b', 99), ('c', 149), ('f', 49)]。这是简化的代码:

(
    element
    | "Add timestamp to event" >> ParDo(AddTimestampCustomFn())
    | "Apply window" >> WindowInto(FixedWindows(50))
    | "Print results" >> ParDo(CustomPrintFn())
)

我觉得我对固定窗口的理解是错误的,但我不明白为什么。

【问题讨论】:

    标签: python google-cloud-dataflow apache-beam stream-processing windowing


    【解决方案1】:

    水印行为取决于来源;它可能并没有像你想象的那样进步,所以这里的所有数据可能都是准时的,没有被丢弃。

    特别是,如果您使用 Create,水印从负无穷开始,所有元素都注入管道,然后水印前进到正无穷。这可确保任何数据都不会延迟,无论您为 Create 提供的顺序如何。

    如果您想显式控制元素注入和水印以进行测试,您可以使用TestStream

    【讨论】:

    • 我明白了,我实际上是在使用TestStream 在单元测试中推进我的水印,但它仍然没有产生预期的结果。假设我有以下消息:message = [('a', 1), ('b', 90), ('c', 140), ('f', 1)]。然后我创建测试流:test_stream = TestStream.advance_watermark(1).add_elements(messages[:3]).advance_watermark(1000).add_elements(messages[3:]) 之后我执行我在问题中指定的管道。但是,由于某种原因,它仍然会收到位于窗口 [0, 50) 中的事件 f
    • 是简化代码还是原始代码?分组和后期数据丢弃发生在 Window 变换后的第一个 GroupByKey 中,而不是立即发生。
    • 这是没有GroupByKey子句的简化版代码
    • 是的,没有 GBK,这是预期的行为,因为延迟数据丢弃发生在 GBK 中。如果 GBK 仍然发生这种情况,那么说明发生了其他事情。
    猜你喜欢
    • 2018-03-08
    • 2016-06-18
    • 2011-10-10
    • 1970-01-01
    • 1970-01-01
    • 2012-10-26
    • 2013-01-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多