【发布时间】:2018-11-23 18:03:35
【问题描述】:
我有一个持续的 JSONArray 数据生成到 Kafka 主题,我想处理具有 EventTime 特征的记录。为了达到这个目标,我必须为 JSONArray 中包含的每条记录分配水印。
我没有找到方便的方法来实现这个目标。我的解决方案是从 DataStreamSource 消费数据>,然后使用匿名 ProcessFunction 迭代 List 并收集 Object 到下游,最后将水印分配给 this 下游。
主要代码如下:
DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env);
SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource
.process(new ProcessFunction<List<MockData>, MockData>() {
@Override
public void processElement(List<MockData> value, Context ctx, Collector<MockData> out)
throws Exception {
value.forEach(mockData -> out.collect(mockData));
}
});
convertToPojo.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) {
@Override
public long extractTimestamp(MockData element) {
return element.getTimestamp();
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo
.keyBy("country").window(
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
.process(
new FlinkEventTimeCountFunction()).name("count elements");
代码看起来没问题,运行也没有错误。但是 ProcessWindowFunction 从未触发。我跟踪 Flink 源代码,发现 EventTimeTrigger 永远不会返回 TriggerResult.FIRE,因为 TriggerContext.getCurrentWatermark 一直返回 Long.MIN_VALUE。
在事件时间处理列表的正确方法是什么?任何建议将不胜感激。
【问题讨论】:
标签: apache-flink flink-streaming