【问题标题】:Apache Flink Watermark behaviour for TwoInputStreamOperator operatorTwoInputStreamOperator 运算符的 Apache Flink 水印行为
【发布时间】:2021-10-22 10:44:19
【问题描述】:

有 2 个数据流分配了时间戳,水印生成器定义如下。

val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[A] {
            override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
              element.lastUpdatedMs
            }
          })
      )

val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[B] {
            override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
              element.lastUpdatedMs
            }
          })
      )

当这两个流在一个运算符中连接时,来自 streamA 或 streamB 的最小水印作为连接运算符的水印。

class CombineAB extends CoProcessFunction[A, B, C] {
   override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
        out.collect(C(elem.x, elem.y, time.Now()))
   }
   override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
        out.collect(C(elem.x, elem.y, time.Now()))
   }
}

val streamC: DataStream[C] = streamA.connect(streamB)
      .process(new CombineAB)

CombineAB 运算符的水印是AB 中的最小值。基于此,C 类型的元素被标记为迟到与否。

但是由于我们没有为C 附加任何时间戳,这是否意味着CombineAB 运算符中的所有元素都没有被标记为迟到?因此在 C 上开窗不会有任何迟到的记录被丢弃?

假设我们将一个时间戳分配和水印生成器附加到 C 如下,那么这是否意味着来自 A 和 B 的水印被完全忽略并且CombineAB 的水印仅取决于 C 的时间戳字段和用 C 定义的延迟.

     streamC.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[C] {
            override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
              element.updatedTime
            }
          })
      )

有没有办法可以将时间戳分配器附加到 C 并且 CombineAB 的水印仍然是 AB 的最小值,并且 C 的元素根据 C 分配的时间戳标记为迟CombineAB的wartermark


更新:CombineAB 的改进实现

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    几点:

    forBoundedOutOfOrderness[A](Duration.ofSeconds(0)) 不寻常。任何乱序事件都会迟到。为什么不使用forMonotonousTimestamps()

    CombineAB产生的记录会有时间戳;无需为此流应用assignTimestampsAndWatermarksCollector 产生的任何记录的时间戳就是传入记录的时间戳。

    如果你在流 C 上调用 assignTimestampsAndWatermarks,传入的水印将被过滤掉,你需要生成新的水印。

    【讨论】:

    • CollectAB 正在生成 C 类型的元素,我们没有提到 C 的哪个字段将用于提取其事件时间戳。那么 Flink 使用哪个时间戳作为 C 的时间戳。是不是,当 A 类型的元素被处理并且我们收集 C 类型的元素时,元素 A 的时间戳被分配给 C。同样,作为结果收集的 C 类型的元素传入类型 B 元素,使用 B 的时间戳。
    • 是的,这就是我想说的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多