【发布时间】:2018-02-01 11:07:57
【问题描述】:
我想从 Kafka 接收最新的数据到 Flink 程序,但是 Flink 正在读取历史数据。
我已将auto.offset.reset 设置为latest 如下所示,但它不起作用
properties.setProperty("auto.offset.reset", "latest");
Flink Programm 正在使用以下代码从 Kafka 接收数据
//getting stream from Kafka and giving it assignTimestampsAndWatermarks
DataStream<JoinedStreamEvent> raw_stream = envrionment.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new JoinSchema(), properties)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
我正在关注关于 https://issues.apache.org/jira/browse/FLINK-4280 ,建议按照下面提到的方式添加源
Properties props = new Properties();
...
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("topic", schema, props);
kafka.setStartFromEarliest();
kafka.setStartFromLatest();
kafka.setEnableCommitOffsets(boolean); // if true, commits on checkpoint if checkpointing is enabled, otherwise, periodically.
kafka.setForwardMetrics(boolean);
...
env.addSource(kafka)
我也这样做了,但是我无法访问setStartFromLatest()
FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09<JoinedStreamEvent>( "test", new JoinSchema(),properties);
我应该怎么做才能接收发送到的最新值 Kafka 而不是从历史中接收价值?
【问题讨论】:
标签: apache-kafka apache-flink kafka-producer-api flink-streaming flink-cep