【发布时间】:2019-07-30 05:58:43
【问题描述】:
我需要创建一个 Apache Beam (Java) 流式传输作业,它应该每 60 秒启动一次(并且仅启动一次)。
我通过使用 GenerateSequence、Window 和 Combine 使用 DirectRunner 使其正常工作。
但是,当我在 Google Dataflow 上运行它时,有时会在 60 秒窗口内多次触发它。我猜这与延迟和乱序消息有关。
Pipeline pipeline = Pipeline.create(options);
pipeline
// Jenerate a tick every 15 seconds
.apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(15)))
// Just to check if individual ticks are being generated once every 15 second
.apply(ParDo.of(new DoFn<Long, Long>() {
@ProcessElement
public void processElement(@Element Long tick, OutputReceiver<Long> out) {
ZonedDateTime currentInstant = Instant.now().atZone(ZoneId.of("Asia/Jakarta"));
LOG.warn("-" + tick + "-" + currentInstant.toString());
out.output(word);
}
}
))
// 60 Second window
.apply("Window", Window.<Long>into(FixedWindows.of(Duration.standardSeconds(60))))
// Emit once per 60 second
.apply("Cobmine window into one", Combine.globally(Count.<Long>combineFn()).withoutDefaults())
.apply("START", ParDo.of(new DoFn<Long, ZonedDateTime>() {
@ProcessElement
public void processElement(@Element Long count, OutputReceiver<ZonedDateTime> out) {
ZonedDateTime currentInstant = Instant.now().atZone(ZoneId.of("Asia/Jakarta"));
// LOG just to check
// This log is sometimes printed more than once within 60 seconds
LOG.warn("x" + count + "-" + currentInstant.toString());
out.output(currentInstant);
}
}
));
它大部分时间都有效,除了随机每 5 或 10 分钟一次,我在同一分钟内看到两个输出。如何确保上面的“START”每 60 秒运行一次?谢谢。
【问题讨论】:
标签: java google-cloud-dataflow apache-beam