【发布时间】:2021-04-06 04:21:20
【问题描述】:
我有一个 Flink 作业来使用 Kafka 主题并将其下沉到另一个主题,并且 Flink 作业设置为 auto.commit,间隔为 3 分钟(禁用检查点),但在监控方面,有 3 分钟的延迟.但是我们想实时监控处理过程,没有 3 分钟的延迟,所以我们希望有一个功能,FlinkKafkaConsumer 能够在接收器功能后立即提交偏移量。
有没有办法在 Flink 框架内实现这个目标?
还是有其他选择?
在第 53 行,我正在尝试创建一个 KafkaConsumer 实例来调用 commitSync() 函数以使其工作,但它不起作用。
public class CEPJobTest {
private final static String TOPIC = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) throws Exception {
System.out.println("start cep test job...");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "console-consumer-cep");
properties.setProperty("enable.auto.commit", "false");
// offset interval
//properties.setProperty("auto.commit.interval.ms", "500");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
properties);
//set commitoffset by checkpoint
consumer.setCommitOffsetsOnCheckpoints(false);
System.out.println("checkpoint enabled:"+consumer.getEnableCommitOnCheckpoints());
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return new Date().toString() + ": " + value;
}
}).print();
//here, I want to commit offset manually after processing message...
KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.commitSync();
env.execute("Flink Streaming");
}
private static Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
}
【问题讨论】: