【问题标题】:What is the proper way of increasing the watermark when using a joinFunction in Flink?在Flink中使用joinFunction时增加水印的正确方法是什么?
【发布时间】:2021-12-23 06:41:45
【问题描述】:

我有两个流,流 A 和流 B。两个流都包含相同类型的事件,具有 ID 和时间戳。现在,我想要 flink 工作做的就是在 1 分钟的窗口内加入具有相同 ID 的事件。水印是在事件中分配的。

sourceA = initialSourceA.map(parseToEvent)
sourceB = initialSourceB.map(parseToEvent)

streamA = sourceA
                .assignTimestampsAndWatermarks(CustomWatermarkStrategy())
                .keyBy(Event.Key)

streamB = sourceB
                .assignTimestampsAndWatermarks(CustomWatermarkStrategy())
                .keyBy(Event.Key)


streamA
                .join(streamB)
                .where(Event.Key)
                .equalTo(Event.Key)
                .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MINUTES)))
                .apply(giveMePairOfEvents)
                .print()

在我的测试中,我尝试发送以下内容:

sourceA.send(Event(ID_1, 0 seconds))
sourceB.send(Event(ID_1, 0 seconds))

//to increase the watermark
sourceA.send(Event(ID_1, 62 seconds))
sourceB.send(Event(ID_1, 62 seconds)) 

对于并行度 = 1,我可以看到从时间 0 开始的事件被连接在一起。

但是,对于并行度 = 2,打印不会显示任何加入的内容。为了解决这个问题,我尝试在每个流的 keyBy 之后打印事件,我可以看到它们都在同一个实例上运行。在水印之后放置打印,原因很明显,事件当前在不同的实例上。

这让我相信在水印方面我做错了什么,因为对于高于 1 的并行度,它不会增加水印。所以这是我问自己的几个问题:

  • 是否每个事件都有单独的水印生成器,我必须专门增加它们?
  • 我是否先运行 keyBy,然后运行 ​​watermark,以便每个流中的事件使用相同的 watermarkgenerator?

发送另一组事件如下:

sourceA.send(Event(ID_1, 0 seconds))
sourceB.send(Event(ID_1, 0 seconds))

//to increase the watermark
sourceA.send(Event(ID_1, 62 seconds))
sourceB.send(Event(ID_1, 62 seconds)) 

sourceA.send(Event(ID_1, 122 seconds))
sourceB.send(Event(ID_1, 122 seconds))

最终发送加入的第一个事件。进一步检查表明,第三组事件使用了第二组未使用的相同水印生成器。我不太清楚为什么会发生的事情。在 Flink 中使用 join 函数时如何正确分配和增加水印?

编辑 1:

自定义水印生成器:

class CustomWaterMarkGenerator(
        private val maxOutOfOrderness: Long,
        private var currentMaxTimeStamp: Long = 0,
)
    : WatermarkGenerator<EventType> {
    override fun onEvent(event: EventType, eventTimestamp: Long, output: WatermarkOutput) {
        val a = currentMaxTimeStamp.coerceAtLeast(eventTimestamp)
        currentMaxTimeStamp = a
        output.emitWatermark(Watermark(currentMaxTimeStamp - maxOutOfOrderness - 1));
    }

    override fun onPeriodicEmit(output: WatermarkOutput?) {
    }
}

水印策略:


class CustomWatermarkStrategy(
): WatermarkStrategy<Event> {
    override fun createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context?): WatermarkGenerator<Event> {
        return CustomWaterMarkGenerator(0)
    }

    override fun createTimestampAssigner(context: TimestampAssignerSupplier.Context?): TimestampAssigner<Event> {
        return TimestampAssigner{ event: Event, _: Long->
            event.timestamp
        }
    }

}

自定义来源:

sourceFunction 当前是一个连接到模拟流的 rsocket 连接,我可以在其中通过 mockStream.send(event) 发送事件。我对事件做的第一件事是使用映射函数(从字符串到我的事件类型)解析它们,然后分配我的水印等。

【问题讨论】:

    标签: stream apache-flink data-stream windowing


    【解决方案1】:
    • 水印生成器的每个并行实例都将独立运行,仅基于其观察到的事件。在来源之后立即做水印是有意义的(虽然更好,一般来说,直接在来源中做水印)。

    • 具有多个输入通道的运算符(例如应用程序中的键控窗口连接)将其当前水印设置为从其活动输入通道接收到的水印的最小值。这具有任何空闲源实例将导致水印在下游任务中停止的效果——除非这些源明确地将自己标记为空闲。 (而FLINK-18934 表示在 Flink 1.14 之前,空闲传播无法与连接正常工作。)在您的情况下,空闲源可能是可疑的。

    • 调试此类问题的一个策略是调出 Flink WebUI 并观察当前水印在所有任务中的行为。

    要获得更多帮助,请分享应用程序的其余部分,或至少分享自定义源和水印策略。

    【讨论】:

    • 添加了带有水印策略和来源的编辑。
    • 所以作业图是源 -> 地图 -> 水印 -> 键控窗口连接 -> 打印。这 5 个流水线阶段的并行度是多少?
    • 我只将环境级别的并行度设置为2。
    • 另外请注意,如果我将并行度保持为默认值:第二组事件(发送 3 次)也不会加入任何内容。对我来说,似乎每组事件都存在于某个实例上(?)并等待下一组事件将水印推送到它们的实例上。
    • 我相信问题是这样的:每个水印生成器都有两个实例,但只有一个键,所以一个实例不处理任何事件,它的水印无法进行。这会阻止连接。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-12
    • 1970-01-01
    相关资源
    最近更新 更多