【发布时间】:2021-12-23 06:41:45
【问题描述】:
我有两个流,流 A 和流 B。两个流都包含相同类型的事件,具有 ID 和时间戳。现在,我想要 flink 工作做的就是在 1 分钟的窗口内加入具有相同 ID 的事件。水印是在事件中分配的。
sourceA = initialSourceA.map(parseToEvent)
sourceB = initialSourceB.map(parseToEvent)
streamA = sourceA
.assignTimestampsAndWatermarks(CustomWatermarkStrategy())
.keyBy(Event.Key)
streamB = sourceB
.assignTimestampsAndWatermarks(CustomWatermarkStrategy())
.keyBy(Event.Key)
streamA
.join(streamB)
.where(Event.Key)
.equalTo(Event.Key)
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MINUTES)))
.apply(giveMePairOfEvents)
.print()
在我的测试中,我尝试发送以下内容:
sourceA.send(Event(ID_1, 0 seconds))
sourceB.send(Event(ID_1, 0 seconds))
//to increase the watermark
sourceA.send(Event(ID_1, 62 seconds))
sourceB.send(Event(ID_1, 62 seconds))
对于并行度 = 1,我可以看到从时间 0 开始的事件被连接在一起。
但是,对于并行度 = 2,打印不会显示任何加入的内容。为了解决这个问题,我尝试在每个流的 keyBy 之后打印事件,我可以看到它们都在同一个实例上运行。在水印之后放置打印,原因很明显,事件当前在不同的实例上。
这让我相信在水印方面我做错了什么,因为对于高于 1 的并行度,它不会增加水印。所以这是我问自己的几个问题:
- 是否每个事件都有单独的水印生成器,我必须专门增加它们?
- 我是否先运行 keyBy,然后运行 watermark,以便每个流中的事件使用相同的 watermarkgenerator?
发送另一组事件如下:
sourceA.send(Event(ID_1, 0 seconds))
sourceB.send(Event(ID_1, 0 seconds))
//to increase the watermark
sourceA.send(Event(ID_1, 62 seconds))
sourceB.send(Event(ID_1, 62 seconds))
sourceA.send(Event(ID_1, 122 seconds))
sourceB.send(Event(ID_1, 122 seconds))
最终发送加入的第一个事件。进一步检查表明,第三组事件使用了第二组未使用的相同水印生成器。我不太清楚为什么会发生的事情。在 Flink 中使用 join 函数时如何正确分配和增加水印?
编辑 1:
自定义水印生成器:
class CustomWaterMarkGenerator(
private val maxOutOfOrderness: Long,
private var currentMaxTimeStamp: Long = 0,
)
: WatermarkGenerator<EventType> {
override fun onEvent(event: EventType, eventTimestamp: Long, output: WatermarkOutput) {
val a = currentMaxTimeStamp.coerceAtLeast(eventTimestamp)
currentMaxTimeStamp = a
output.emitWatermark(Watermark(currentMaxTimeStamp - maxOutOfOrderness - 1));
}
override fun onPeriodicEmit(output: WatermarkOutput?) {
}
}
水印策略:
class CustomWatermarkStrategy(
): WatermarkStrategy<Event> {
override fun createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context?): WatermarkGenerator<Event> {
return CustomWaterMarkGenerator(0)
}
override fun createTimestampAssigner(context: TimestampAssignerSupplier.Context?): TimestampAssigner<Event> {
return TimestampAssigner{ event: Event, _: Long->
event.timestamp
}
}
}
自定义来源:
sourceFunction 当前是一个连接到模拟流的 rsocket 连接,我可以在其中通过 mockStream.send(event) 发送事件。我对事件做的第一件事是使用映射函数(从字符串到我的事件类型)解析它们,然后分配我的水印等。
【问题讨论】:
标签: stream apache-flink data-stream windowing