【问题标题】:Google dataflow 2.0 pubsub handler late dataGoogle dataflow 2.0 pubsub 处理程序延迟数据
【发布时间】:2018-02-09 15:06:15
【问题描述】:

我有一个关于 goolge 数据流的问题。

我正在编写一个从 PubSub 读取数据并写入 BigQuery 的数据流管道,它可以正常工作。
现在我必须处理延迟数据,我在互联网上关注了一些示例,但它无法正常工作,这是我的代码:

pipeline.apply(PubsubIO.readStrings()
            .withTimestampAttribute("timestamp").fromSubscription(Constants.SUBSCRIBER))
        .apply(ParDo.of(new ParseEventFn()))        
        .apply(Window.<Entity> into(FixedWindows.of(WINDOW_SIZE))
            // processing of late data.
            .triggering(
                    AfterWatermark
                            .pastEndOfWindow()
                            .withEarlyFirings(
                                    AfterProcessingTime
                                            .pastFirstElementInPane()
                                            .plusDelayOf(DELAY_SIZE))
                            .withLateFirings(AfterPane.elementCountAtLeast(1)))
            .withAllowedLateness(ALLOW_LATE_SIZE)
            .accumulatingFiredPanes())
        .apply(ParDo.of(new ParseTableRow()))
        .apply("Write to BQ", BigQueryIO.<TableRow>write()...

这是我的发布订阅消息:

{
...,
"timestamp" : "2015-08-31T09:52:25.005Z"
}

当我手动推送一些消息(转到 PupsubTopic 并发布)时,时间戳为

【问题讨论】:

  • 我不太明白你的问题。你能改写/编辑它以更清楚地说明问题所在吗?
  • 对不起,如果我的解释不清楚。
  • 我的问题是我在从 pubsub 调用数据时添加了海关时间戳,但是当我应用触发器以获取当前事件时间约 10 分钟的时间戳的延迟数据时,我之前使用时间戳手动推送了所有数据15 分钟(或更多时间之前的任何数据)仍然在 ParseTableRow 方法中收集到我的数据列表中。请帮我解释一下,我认为没有触发触发或者我缺少一些处理lata数据的代码?感谢您的建议。

标签: google-cloud-dataflow


【解决方案1】:

您应该使用“持续时间”对象正式指定允许的延迟时间:.withAllowedLateness(Duration.standardMinutes(ALLOW_LATE_SIZE)),假设您已经设置了 ALLOW_LATE_SIZE 的值(以分钟为单位)。

您可以查看文档page 中的“Google Cloud Dataflow SDK for Java”,特别是“触发器”子章节。

【讨论】:

  • 嗨,乔治,这是按分钟计算的持续时间,我刚刚定义了使用另一个数据流是恒定的。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2022-10-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-05
相关资源
最近更新 更多