【问题标题】:How to create streaming Beam pipeline that is triggered once and only once in a fixed interval如何创建在固定间隔内触发一次且仅触发一次的流式 Beam 管道
【发布时间】:2019-07-30 05:58:43
【问题描述】:

我需要创建一个 Apache Beam (Java) 流式传输作业,它应该每 60 秒启动一次(并且仅启动一次)。

我通过使用 GenerateSequence、Window 和 Combine 使用 DirectRunner 使其正常工作。

但是,当我在 Google Dataflow 上运行它时,有时会在 60 秒窗口内多次触发它。我猜这与延迟和乱序消息有关。

Pipeline pipeline = Pipeline.create(options);
pipeline
    // Jenerate a tick every 15 seconds
    .apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(15)))
    // Just to check if individual ticks are being generated once every 15 second
    .apply(ParDo.of(new DoFn<Long, Long>() {
            @ProcessElement
            public void processElement(@Element Long tick, OutputReceiver<Long> out) {
                ZonedDateTime currentInstant = Instant.now().atZone(ZoneId.of("Asia/Jakarta"));
                LOG.warn("-" + tick + "-" + currentInstant.toString());
                out.output(word);
            }
        }
    ))
    // 60 Second window
    .apply("Window", Window.<Long>into(FixedWindows.of(Duration.standardSeconds(60))))
    // Emit once per 60 second 
    .apply("Cobmine window into one", Combine.globally(Count.<Long>combineFn()).withoutDefaults())
    .apply("START", ParDo.of(new DoFn<Long, ZonedDateTime>() {
            @ProcessElement
            public void processElement(@Element Long count, OutputReceiver<ZonedDateTime> out) {
                ZonedDateTime currentInstant = Instant.now().atZone(ZoneId.of("Asia/Jakarta"));
                // LOG just to check
                // This log is sometimes printed more than once within 60 seconds
                LOG.warn("x" + count + "-" + currentInstant.toString());
                out.output(currentInstant);
            }
        }
    ));

它大部分时间都有效,除了随机每 5 或 10 分钟一次,我在同一分钟内看到两个输出。如何确保上面的“START”每 60 秒运行一次?谢谢。

【问题讨论】:

    标签: java google-cloud-dataflow apache-beam


    【解决方案1】:

    简短回答:目前不能,Beam 模型专注于事件时间处理和延迟数据的正确处理。

    解决方法:您可以定义一个处理时间计时器,但您必须手动处理计时器和延迟数据的输出和处理,see thisthis

    更多详情:

    Beam 中的窗口和触发器通常在事件时间中定义,而不是在处理时间中。这样,如果您在已经为某个窗口发出结果之后有迟到的数据,迟到的数据仍然会出现在正确的窗口中,并且可以为该窗口重新计算结果。 Beam 模型允许您表达该逻辑,并且它的大部分功能都是为此量身定制的。

    这也意味着通常不需要 Beam 管道在某些特定的实际时间发出结果,例如说诸如“根据事件本身中的数据聚合属于某个窗口的事件,然后每分钟输出该窗口”之类的话是没有意义的。 Beam runner 为窗口聚合数据,可能会等待迟到的数据,然后在它认为正确时立即发出结果。数据准备好发出的条件由触发器指定。但仅此而已 - 当窗口数据准备好发出时的条件,它实际上并没有强制运行器发出它。因此,运行器可以在满足触发条件后的任何时间点发出它并且结果将是正确的,即如果自满足计时器条件后有更多事件到达,则只会处理属于具体窗口的事件在那个窗口中。

    事件时间窗口不适用于处理时间触发,并且 Beam 中没有方便的原语(触发器/窗口)来处理延迟数据的处理时间。在此模型中,如果您使用只触发一次的触发器,您会丢失后期数据,并且您仍然无法定义稳健的处理时间触发器。要构建类似的东西,您必须能够指定诸如开始测量处理时间的真实时间点之类的东西,并且您将不得不处理不同处理时间和可能发生的延迟问题大量的工人机器。目前这还不是 Beam 的一部分。

    Beam 社区中的一些努力将支持此用例,例如sink triggersretractions 这将允许您在事件时间空间中定义您的管道,但无需复杂的事件时间触发器。结果可以立即更新/重新计算并发出,或者可以在接收器上指定触发器,例如“我希望每分钟更新一次输出表”。结果将自动更新并重新计算延迟数据,无需您参与。尽管此时这些努力还远未完成,因此您目前最好的选择是使用existing triggers 之一或使用timers 手动处理所有内容。

    【讨论】:

    • 感谢@anton,我理解很难(或不可能)确定处理时间。但是是否有可能在事件时间上设置窗口并在事件时间中为事件(即 GenerateSequence)生成一次且仅一次的 Combine 输出? (可以忽略或等待固定的秒数,但在给定的 60 秒间隔内只发出一次窗口。乱序也可以。)
    猜你喜欢
    • 2011-08-13
    • 2020-08-25
    • 1970-01-01
    • 2022-01-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-22
    相关资源
    最近更新 更多