【问题标题】:Kafka : Consumer api : Failing to read and acknowledge manually from offset with kafka-consumer-apiKafka:消费者 api:无法使用 kafka-consumer-api 从偏移量手动读取和确认
【发布时间】:2020-05-07 18:25:07
【问题描述】:

我的用例是使用 kafka consumer-api,以便我们可以手动从 kafka-topic 中读取最后成功处理的数据的偏移量,然后手动确认 Kafka 的成功处理数据。 (这是为了减少数据丢失)。但是,在我当前的实现中,即使 我注释掉了 'ack.acknowledge()',程序也会向前移动并从下一个偏移量读取。我是 Kafka 的新手,并通过以下方式实现了我的消费者(我们使用的是 spring boot)

问题是:即使我注释掉 ack.acknowledge(),偏移量仍在更新,消费者正在读取下一个偏移量,这是出乎意料的(到目前为止我的理解)

Consumer Config [请注意我将 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false 并设置 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)]:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private AdapterStreamProperties appProperties;

    @Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
    private String receiveBufferBytes;

    @Bean
    public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                "adapter");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

那么我的消费者就是这样消费的[即使我注释掉 'ack.acknowledge()' ,它下次仍然从下一个偏移量读取:

  @KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
  public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
                     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                     @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {

    System.out.println("----------------------------------------------------------------------------");
    System.out.println("Reading from Offset: " +  offset + ", and Partition: " + partition);
    System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " +   record.value());
    System.out.println("----------------------------------------------------------------------------");
    NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());

    if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
      kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
    }
    else {
      kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
    }
    ack.acknowledge();
  }

我的 gradle.build 中的 kafka api 版本:

//Kafka Dependencie
implementation      'org.apache.kafka:kafka-streams:2.0.1'
implementation      'org.apache.kafka:kafka-clients:2.0.1'

任何见解都会有所帮助。

提前致谢

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:
    ack.acknowledge()
    

    仅表示您提交,您的消费者已成功处理该消息。

    这将更新消费者组偏移量,如果您的应用程序在没有 ack 的情况下停止,则当前偏移量将不会提交给 kafka(您已处理且未确认的最后一条消息)。

    具有相同消费者组(以及相同分配的分区)的新消费者将再次使用相同的消息,因为应用程序将从消费者组偏移量中读取起点。

    如果你确认或不确认对你当前的监听器没有影响,它会简单地继续处理新消息 (只要没有错误发生,我猜重新平衡也可能是偏移量重新加载的来源(不完全确定))

    【讨论】:

    • 问题是:即使我注释掉 ack.acknowledge(),偏移量仍在更新,消费者正在读取下一个偏移量,这是出乎意料的(到目前为止我的理解)
    • 见我回答的最后一部分。您的消费者仍将继续使用新消息。但是,如果您的应用程序 f.e.具有相同消费者组的新消费者崩溃将在最后一个确认的消息之后继续。因此,在您不确认任何消息的情况下,您的场景可能存在很大差距。
    • 感谢您的解释
    猜你喜欢
    • 1970-01-01
    • 2019-01-18
    • 1970-01-01
    • 2016-09-09
    • 2016-03-11
    • 2018-11-23
    • 1970-01-01
    • 2017-01-20
    • 1970-01-01
    相关资源
    最近更新 更多