【问题标题】:Early triggering and chained aggregations in Apache BeamApache Beam 中的早期触发和链式聚合
【发布时间】:2018-05-24 09:36:56
【问题描述】:

是否可以在 Apache Beam 中组合/链接具有不同窗口和触发的多个聚合。

示例:

我有一个PCollection<KV<String, Long>> 的输入,我需要计算 2 个总和:1 分钟固定窗口和每 1 分钟 1 小时滑动窗口,我想每分钟得到推测结果。

触发器:

Trigger trigger =
    Repeatedly.forever(
            AfterProcessingTime.pastFirstElementInPane()
                // Speculative every ONE_MINUTE
                .plusDelayOf(ONE_MINUTE))
        // final result past watermark
        .orFinally(AfterWatermark.pastEndOfWindow());

有一个输入:PCollection<KV<String, Long>> input 我可以通过 2 个聚合来完成:

PCollection<KV<String, Long>> oneMinSum = input
    .apply(Window.into(1 min).triggering(trigger))
    .apply(Sum.longsPerKey())

PCollection<KV<String, Long>> slidingSum = input
    .apply(Window.into(1 hour sliding 1 min).triggering(trigger))
    .apply(Sum.longsPerKey())

但在这种情况下,第二次聚合将对一分钟求和中已经使用的完全相同的数据进行求和,如果我可以使用 oneMinSum 作为滑动聚合的输入,我会节省大量 CPU,但这不起作用,第二个聚合计算早期触发窗格和最终窗格的总和,重复计算并产生不正确的滑动总和。

完整的测试用例在这里:https://gist.github.com/anonymous/2920e870a02abcbec51e10c3fd293236

输出

key=a value=1
    window=[2017-01-01T00:00:00.000Z..2017-01-01T00:10:00.000Z)
    pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
key=a value=5
    window=[2017-01-01T00:00:00.000Z..2017-01-01T00:10:00.000Z)
    pane=PaneInfo{timing=EARLY, index=1}
key=a value=7
    window=[2017-01-01T00:00:00.000Z..2017-01-01T00:10:00.000Z)
    pane=PaneInfo{isLast=true, timing=ON_TIME, index=2, onTimeIndex=0}

我见过的所有示例都假定Window.into 仅应用于PCollection 一次,并且在计算聚合后,结果会进入一些存储(例如 BigQuery 等),我从未见过任何示例“链接”聚合和多次更改窗口。

Beam 编程模型的用例是否正确?或者 Beam 编程模型假设 Window.into(...).triggering(...) 只会被指定一次?

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    TLDR:要解决此问题,请在第一个聚合中使用 discardingFiredPanes

    您指定的触发器与您的想法不同。 AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE)) 在窗格中的第一个元素后 1 分钟触发,但由于您还使用了 accumulatingFiredPanes()Repeatedly.forever(...),触发器将触发

    (i) 第一次出现在第一个元素之后 1 分钟

    (ii) 每次满足 (i) 之后有新元素时

    对于您的测试用例,我注释了 oneMinSum 发生的情况

    .advanceWatermarkTo(t0)
    .addElements(KV.of("a", 1))
    .advanceProcessingTime(TWO_MINUTES) // emit (a, 1) because of (i)
    .addElements(KV.of("a", 1))
    .advanceProcessingTime(TWO_MINUTES) // emit (a, 2) because of (ii)
    .advanceWatermarkTo(t1)             // emit another (a, 2) past watermark
    .addElements(KV.of("a", 1))
    .advanceProcessingTime(TWO_MINUTES) // emit (a, 1) because of (i)
    .advanceWatermarkToInfinity();      // emit another (a, 1) past watermark
    

    (a, 2)(a, 1)的双重发射解释here

    对于onHourSums,你得到

    • key=a value=1 因为提前触发
    • key=a value=5 (1+2+2) 因为提前触发
    • key=a value=7 (1+2+2+1+1) 因为窗口关闭

    【讨论】:

      猜你喜欢
      • 2018-05-25
      • 1970-01-01
      • 1970-01-01
      • 2020-01-01
      • 2023-02-02
      • 2021-09-29
      • 1970-01-01
      • 2021-12-19
      • 1970-01-01
      相关资源
      最近更新 更多