【发布时间】:2017-01-03 12:27:55
【问题描述】:
我有一个 Pub/Sub 主题 + 订阅,并希望在 Dataflow 中使用和聚合订阅中的无限数据。我使用固定窗口并将聚合写入 BigQuery。
读写(没有窗口和聚合)工作正常。但是当我将数据传输到一个固定窗口(计算每个窗口中的元素)时,窗口永远不会是triggered。因此没有写入聚合。
这是我的单词发布者(它使用来自examples 的 kinglear.txt 作为输入文件):
public static class AddCurrentTimestampFn extends DoFn<String, String> {
@ProcessElement public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element(), new Instant(System.currentTimeMillis()));
}
}
public static class ExtractWordsFn extends DoFn<String, String> {
@ProcessElement public void processElement(ProcessContext c) {
String[] words = c.element().split("[^a-zA-Z']+");
for (String word:words){ if(!word.isEmpty()){ c.output(word); }}
}
}
// main:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
p.apply("ReadLines", TextIO.Read.from(o.getInputFile()))
.apply("Lines2Words", ParDo.of(new ExtractWordsFn()))
.apply("AddTimestampFn", ParDo.of(new AddCurrentTimestampFn()))
.apply("WriteTopic", PubsubIO.Write.topic(o.getTopic()));
p.run();
这是我的窗口字数计数器:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
BigQueryIO.Write.Bound tablePipe = BigQueryIO.Write.to(o.getTable(o))
.withSchema(o.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
Window.Bound<String> w = Window
.<String>into(FixedWindows.of(Duration.standardSeconds(1)));
p.apply("ReadTopic", PubsubIO.Read.subscription(o.getSubscription()))
.apply("FixedWindow", w)
.apply("CountWords", Count.<String>perElement())
.apply("CreateRows", ParDo.of(new WordCountToRowFn()))
.apply("WriteRows", tablePipe);
p.run();
上述订阅者将不起作用,因为窗口似乎没有使用default trigger 触发。但是,如果我手动定义触发器,则代码会起作用,并且计数会写入 BigQuery。
Window.Bound<String> w = Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();
如果可能,我希望避免指定自定义触发器。
问题:
- 为什么我的解决方案不适用于 Dataflow 的 default trigger?
- 如何更改我的发布者或订阅者以使用default trigger 触发窗口?
【问题讨论】:
-
你有工作ID吗?
-
请注意,您的触发器将在一次触发后“完成”,删除所有进一步的数据。要匹配默认触发器,您需要
Repeatedly.forever(AfterWatermark.pastEndOfWindow())(如果您的允许延迟为零,则实际上不会重复) -
刚刚用这个触发器测试过。但是基于水印的东西在所描述的场景中不起作用。 “CountWords”步骤的内部
Combine.GroupedValues操作永远不会执行;即使在离开管道运行 > 10 分钟之后。 “CountWord”的GroupByKey动作是最后执行的动作。我现在将调查/尝试 Ben 提出的timestampLabel方法。 @jkff:作业ID为:2017-01-04_07_33_56-14457038567248221079
标签: google-cloud-dataflow apache-beam