【问题标题】:Estimating Watermark for Event Time in Beam估计 Beam 中事件时间的水印
【发布时间】:2020-12-09 14:13:07
【问题描述】:

我正在尝试使用 Beam 使用数据中的事件时间和 Kafka 作为数据源来聚合一组数据。如果我所有的 kafka 分区都填充了数据,则此方法有效。但是,一旦尚未写入分区,就无法估计和推进水印。我的 TimeStampPolicy 如下:

public class CustomTimeStampPolicy
    extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> {
  protected Instant currentWatermark;

  public CustomTimeStampPolicy(final Optional<Instant> previousWatermark) {
    this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
  }


  @Override
  public Instant getTimestampForRecord(final PartitionContext ctx,
      final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) {
    this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
    return this.currentWatermark;
  }

  @Override
  public Instant getWatermark(final PartitionContext ctx) {
    System.out.println("Current Watermark: " + this.currentWatermark);
    return this.currentWatermark;
  }
}

使用 3 个 Kafka 分区,其中只有一个填充了数据,我的日志显示这些水印:

Current Watermark: -290308-12-21T19:59:05.225Z
Current Watermark: 2020-12-09T10:42:29.909Z
Current Watermark: -290308-12-21T19:59:05.225Z

默认触发我的窗口不会触发。我的猜测是输出水印是分区水印的最小值。因此,只要我的一些分区是空的,就不会前进。如何通过事件时间处理来处理空分区?

【问题讨论】:

    标签: timestamp apache-beam watermark stream-processing apache-beam-kafkaio


    【解决方案1】:

    如果没有数据写入 Kafka 分区,Beam 无法知道一旦写入一个元素,它就不会有过去任意的时间戳,因此是非常古老的水印。

    您可以尝试将时间戳策略构造函数更新为 previousWatermark.orElse(wallTime - someMaximumSkew)

    其中someMaximumSkew 是写入 kafka 的数据可能出现的最大延迟。您还可以考虑在一段时间内没有数据写入时取之前的值(如果有)和wallTime - someMaximumSkew 的最小值。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-01-18
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多