【问题标题】:Proper way to assign watermark with DateStreamSource<List<T>> using Flink使用 Flink 使用 DateStreamSource<List<T>> 分配水印的正确方法
【发布时间】:2018-11-23 18:03:35
【问题描述】:

我有一个持续的 JSONArray 数据生成到 Kafka 主题,我想处理具有 EventTime 特征的记录。为了达到这个目标,我必须为 JSONArray 中包含的每条记录分配水印。

我没有找到方便的方法来实现这个目标。我的解决方案是从 DataStreamSource 消费数据>,然后使用匿名 ProcessFunction 迭代 List 并收集 Object 到下游,最后将水印分配给 this 下游。

主要代码如下:

DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env); SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource .process(new ProcessFunction<List<MockData>, MockData>() { @Override public void processElement(List<MockData> value, Context ctx, Collector<MockData> out) throws Exception { value.forEach(mockData -> out.collect(mockData)); } }); convertToPojo.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) { @Override public long extractTimestamp(MockData element) { return element.getTimestamp(); } }); SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo .keyBy("country").window( SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10))) .process( new FlinkEventTimeCountFunction()).name("count elements");

代码看起来没问题,运行也没有错误。但是 ProcessWindowFunction 从未触发。我跟踪 Flink 源代码,发现 EventTimeTrigger 永远不会返回 TriggerResult.FIRE,因为 TriggerContext.getCurrentWatermark 一直返回 Long.MIN_VALUE。

在事件时间处理列表的正确方法是什么?任何建议将不胜感激。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    问题在于您将 keyBy 和窗口操作应用于 convertToPojo 流,而不是带有时间戳和水印(您没有分配给变量)的流。

    如果您或多或少这样编写代码,它应该可以工作:

    listDataStreamSource = KafkaSource ...
    convertToPojo = listDataStreamSource.process ...
    pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
    countStream = pojoPlusWatermarks.keyBy ...
    

    在 convertToPojo 流上调用 assignTimestampsAndWatermarks 不会修改该流,而是创建一个包含时间戳和水印的新数据流对象。您需要将窗口应用到该新数据流。

    【讨论】:

    • 很高兴见到你,感谢你的及时回复。我不太明白,这是否意味着用户不能为源流以外的下游分配水印。我发现 SourceFunction 有collectWithTimestamp 但在 Collector 中没有。
    • 非常感谢!!!您的精确解释启发了我,按照您的指南和伪代码,它现在运行良好。
    猜你喜欢
    • 2021-12-23
    • 1970-01-01
    • 2016-05-18
    • 1970-01-01
    • 2018-01-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多