【发布时间】:2021-10-22 10:44:19
【问题描述】:
有 2 个数据流分配了时间戳,水印生成器定义如下。
val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[A] {
override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[B] {
override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
当这两个流在一个运算符中连接时,来自 streamA 或 streamB 的最小水印作为连接运算符的水印。
class CombineAB extends CoProcessFunction[A, B, C] {
override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
}
val streamC: DataStream[C] = streamA.connect(streamB)
.process(new CombineAB)
CombineAB 运算符的水印是A 或B 中的最小值。基于此,C 类型的元素被标记为迟到与否。
但是由于我们没有为C 附加任何时间戳,这是否意味着CombineAB 运算符中的所有元素都没有被标记为迟到?因此在 C 上开窗不会有任何迟到的记录被丢弃?
假设我们将一个时间戳分配和水印生成器附加到 C 如下,那么这是否意味着来自 A 和 B 的水印被完全忽略并且CombineAB 的水印仅取决于 C 的时间戳字段和用 C 定义的延迟.
streamC.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[C] {
override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
element.updatedTime
}
})
)
有没有办法可以将时间戳分配器附加到 C 并且 CombineAB 的水印仍然是 A 和 B 的最小值,并且 C 的元素根据 C 分配的时间戳标记为迟CombineAB的wartermark
更新:CombineAB 的改进实现
【问题讨论】:
标签: apache-flink flink-streaming