【问题标题】:Consuming unbounded data in windows with default trigger使用默认触发器在窗口中使用无限数据
【发布时间】:2017-01-03 12:27:55
【问题描述】:

我有一个 Pub/Sub 主题 + 订阅,并希望在 Dataflow 中使用和聚合订阅中的无限数据。我使用固定窗口并将聚合写入 BigQuery。

读写(没有窗口和聚合)工作正常。但是当我将数据传输到一个固定窗口(计算每个窗口中的元素)时,窗口永远不会是triggered。因此没有写入聚合。

这是我的单词发布者(它使用来自examples 的 kinglear.txt 作为输入文件):

public static class AddCurrentTimestampFn extends DoFn<String, String> {
    @ProcessElement public void processElement(ProcessContext c) {
        c.outputWithTimestamp(c.element(), new Instant(System.currentTimeMillis()));
    }
}

public static class ExtractWordsFn extends DoFn<String, String> {
    @ProcessElement public void processElement(ProcessContext c) {
        String[] words = c.element().split("[^a-zA-Z']+");
        for (String word:words){ if(!word.isEmpty()){ c.output(word); }}
    }
}

// main:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
p.apply("ReadLines", TextIO.Read.from(o.getInputFile()))
        .apply("Lines2Words", ParDo.of(new ExtractWordsFn()))
        .apply("AddTimestampFn", ParDo.of(new AddCurrentTimestampFn()))
        .apply("WriteTopic", PubsubIO.Write.topic(o.getTopic()));
p.run();

这是我的窗口字数计数器:

Pipeline p = Pipeline.create(o); // 'o' are the pipeline options

BigQueryIO.Write.Bound tablePipe = BigQueryIO.Write.to(o.getTable(o))
        .withSchema(o.getSchema())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);

Window.Bound<String> w = Window
        .<String>into(FixedWindows.of(Duration.standardSeconds(1)));

p.apply("ReadTopic", PubsubIO.Read.subscription(o.getSubscription()))
        .apply("FixedWindow", w)
        .apply("CountWords", Count.<String>perElement())
        .apply("CreateRows", ParDo.of(new WordCountToRowFn()))
        .apply("WriteRows", tablePipe);
p.run();

上述订阅者将不起作用,因为窗口似乎没有使用default trigger 触发。但是,如果我手动定义触发器,则代码会起作用,并且计数会写入 BigQuery。

Window.Bound<String> w = Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
        .triggering(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(1)))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes();

如果可能,我希望避免指定自定义触发器。

问题:

  1. 为什么我的解决方案不适用于 Dataflow 的 default trigger
  2. 如何更改我的发布者或订阅者以使用default trigger 触发窗口?

【问题讨论】:

  • 你有工作ID吗?
  • 请注意,您的触发器将在一次触发后“完成”,删除所有进一步的数据。要匹配默认触发器,您需要Repeatedly.forever(AfterWatermark.pastEndOfWindow())(如果您的允许延迟为零,则实际上不会重复)
  • 刚刚用这个触发器测试过。但是基于水印的东西在所描述的场景中不起作用。 “CountWords”步骤的内部Combine.GroupedValues 操作永远不会执行;即使在离开管道运行 > 10 分钟之后。 “CountWord”的GroupByKey 动作是最后执行的动作。我现在将调查/尝试 Ben 提出的 timestampLabel 方法。 @jkff:作业ID为:2017-01-04_07_33_56-14457038567248221079

标签: google-cloud-dataflow apache-beam


【解决方案1】:

您如何确定触发器永远不会触发?

您的PubSubIO.WritePubSubIO.Read 转换都应使用withTimestampLabel 指定时间戳标签,否则您添加的时间戳将不会写入PubSub,而将使用发布时间。

无论哪种方式,管道的输入水印都将来自在 PubSub 中等待的元素的时间戳。处理完所有输入后,它将保留几分钟(以防发布者出现延迟),然后再进入实时状态。

您可能会看到所有元素都在同一个大约 1 秒的窗口中发布(因为输入文件非常小)。这些都是相对较快的读取和处理,但是它们放入的 1 秒窗口要等到输入水印前进后才会触发,说明该 1 秒窗口中的所有数据都已被消耗掉。

这要等几分钟后才会发生,这可能会使触发器看起来好像不起作用。您编写的触发器在 1 秒的处理时间后触发,这会更早触发,但不能保证所有数据都已处理。

从默认触发器获得更好行为的步骤:

  1. 在写入和读取 pubsub 步骤中都使用 withTimestampLabel
  2. 让发布者进一步分散时间戳(例如,运行几分钟并将时间戳分散到该范围内)

【讨论】:

  • 如何机械地“在该范围内传播时间戳”?我从 rabbitmq 而不是 pubsub 读到了类似的问题,但我认为我的一般问题是相同的:消息出现在相同的“事件时间”,我已经修复了 1 秒的窗口,但是 GBK 操作永远不会完成,因为它似乎永远等待让水印前进。
猜你喜欢
  • 2022-12-07
  • 2020-01-01
  • 2011-01-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-11
  • 2012-05-25
  • 1970-01-01
相关资源
最近更新 更多