【问题标题】:How to Commit Kafka Offsets Manually in Flink如何在 Flink 中手动提交 Kafka 偏移量
【发布时间】:2021-04-06 04:21:20
【问题描述】:

我有一个 Flink 作业来使用 Kafka 主题并将其下沉到另一个主题,并且 Flink 作业设置为 auto.commit,间隔为 3 分钟(禁用检查点),但在监控方面,有 3 分钟的延迟.但是我们想实时监控处理过程,没有 3 分钟的延迟,所以我们希望有一个功能,FlinkKafkaConsumer 能够在接收器功能后立即提交偏移量。
有没有办法在 Flink 框架内实现这个目标?
还是有其他选择?

在第 53 行,我正在尝试创建一个 KafkaConsumer 实例来调用 commitSync() 函数以使其工作,但它不起作用。

    public class CEPJobTest {
        private final static String TOPIC = "test";
        private final static String BOOTSTRAP_SERVERS = "localhost:9092";
        public static void main(String[] args) throws Exception {
    
            System.out.println("start cep test job...");
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            properties.setProperty("zookeeper.connect", "localhost:2181");
            properties.setProperty("group.id", "console-consumer-cep");
            properties.setProperty("enable.auto.commit", "false");
            // offset interval
            //properties.setProperty("auto.commit.interval.ms", "500");
    
            FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
                    properties);
    
            //set commitoffset by checkpoint
            consumer.setCommitOffsetsOnCheckpoints(false);
            System.out.println("checkpoint enabled:"+consumer.getEnableCommitOnCheckpoints());
    
            DataStream<String> stream = env.addSource(consumer);
    
            stream.map(new MapFunction<String, String>() {
    
                @Override
                public String map(String value) throws Exception {
                    return new Date().toString() + ":  " + value;
                }
    
            }).print();
           
            //here, I want to commit offset manually after processing message...
    
            KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer(properties);
            kafkaConsumer.commitSync();
            env.execute("Flink Streaming");
    
    
    
        }
        private static Consumer<Long, String> createConsumer() {
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
            return consumer;
        }
    }

【问题讨论】:

    标签: apache-kafka apache-flink


    【解决方案1】:

    这不像你的代码那样工作

    env.execute是提交作业到集群,然后提交执行。此行之前的代码只是构建作业图,而不是执行任何操作。

    要在 sink 之后做这个,你应该把它放在你的 sink 函数中

    class mySink extends RichSinkFunction {
      override def invoke(...) = {
        val kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.commitSync();
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-06-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-18
      • 2015-04-09
      相关资源
      最近更新 更多