【问题标题】:Spring kafka : Kafka Listener- consumer.seek issueSpring kafka:Kafka Listener-consumer.seek 问题
【发布时间】:2018-04-15 05:17:50
【问题描述】:

我们正在使用 Spring KafkaListener,它在处理到 DB 后确认每个记录。如果我们在写入数据库时​​遇到问题,我们不会确认记录,因此不会为消费者提交偏移量。这很好用。现在我们想在下一次轮询中获取失败的消息以重试它们。我们将错误处理程序添加到我们的侦听器并调用 ConsumerAwareListenerErrorHandler 并尝试对失败的消息偏移执行 consumer.seek()。期望在下一次轮询期间,我们应该收到失败的消息。这没有发生。下一次轮询仅获取新消息而不是失败消息代码 sn-p 如下所示。

@Service
public class KafkaConsumer {
       @KafkaListener(topics = ("${kafka.input.stream.topic}"), containerFactory = "kafkaManualAckListenerContainerFactory", errorHandler = "listen3ErrorHandler")
      public void onMessage(ConsumerRecord<Integer, String> record,
            Acknowledgment acknowledgment ) throws Exception {

    try {

        msg = JaxbUtil.convertJsonStringToMsg(record.value());
        onHandList = DCMUtil.convertMsgToOnHandDTO(msg);

        TeradataDAO.updateData(onHandList);
        acknowledgment.acknowledge();
        recordSuccess = true;

        LOGGER.info("Message Saved in Teradata DB");

    }  catch (Exception e) {
        LOGGER.error("Error Processing On Hand Data ", e);
        recordSuccess = false;
    }

}

  @Bean
    public ConsumerAwareListenerErrorHandler listen3ErrorHandler() throws InterruptedException {
        return (message, exception, consumer) -> {
            this.listen3Exception = exception;
            MessageHeaders headers = message.getHeaders();
            consumer.seek(new org.apache.kafka.common.TopicPartition(
                          headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
                          headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
                          headers.get(KafkaHeaders.OFFSET, Long.class)); 
            return null;
        };
    }
}

    Container  Class

    @Bean
public Map<Object,Object> consumerConfigs() {
    Map<Object,Object>  props = new HashMap<Object,Object> ();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            localhost:9092);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-1");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return props;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory(consumerConfigs());
}

 @SuppressWarnings("unchecked")
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaManualAckListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    return factory;
}

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    它应该像这样工作:

    如果您想丢弃上一次轮询中的其他记录,则错误处理程序需要引发异常。

    由于您正在“处理”错误,因此容器一无所知,并将继续使用轮询中的剩余记录调用侦听器。

    也就是说,我看到容器也忽略了错误处理程序抛出的异常(如果错误处理程序抛出 Error 而不是异常,它将丢弃)。我将为此打开一个问题。

    另一种解决方法是将Consumer 添加到侦听器方法签名中并在那里进行搜索(并引发异常)。如果没有错误处理程序,则丢弃批处理的其余部分。

    更正

    如果容器没有ErrorHandler,则ListenerErrorHandler抛出的任何Throwable都会导致剩余记录被丢弃。

    【讨论】:

    • 我尝试将消费者添加到我的侦听器中,并在异常 @KafkaListener(topics = ("${kafka.input.stream.topic}"), containerFactory = "kafkaManualAckListenerContainerFactory") public void onMessage 中进行了搜索(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer) throws Exception { } catch (Exception e) { consumer.seek(new org.apache.kafka.common.TopicPartition(record.topic(), record.partition( )), 记录偏移量());扔 e;我将错误处理程序视为 LoggingErrorHandler() 而不是 null
    • 不要把代码放在cmets中;很难阅读;改为编辑问题。是的;我开始在 github.com/spring-projects/spring-kafka/issues/470 上工作 - 希望在第二天左右有一个修复。
    • 我认为目前唯一的解决方法是使用ConsumerAwareErrorHandler(在容器中,而不是侦听器错误处理程序中)并抛出Error
    【解决方案2】:

    请尝试使用 SeekToCurrentErrorHandler。文档说“这允许实现查找所有未处理的主题/分区,以便下一次轮询将检索当前记录(以及剩余的其他记录)。SeekToCurrentErrorHandler 正是这样做的。

    容器将在调用错误处理程序之前提交任何未决的偏移提交。” https://docs.spring.io/autorepo/docs/spring-kafka-dist/2.1.0.BUILD-SNAPSHOT/reference/htmlsingle/#_seek_to_current_container_error_handlers

    【讨论】:

      猜你喜欢
      • 2018-11-11
      • 1970-01-01
      • 1970-01-01
      • 2011-11-14
      • 1970-01-01
      • 1970-01-01
      • 2019-07-09
      • 2018-10-26
      • 1970-01-01
      相关资源
      最近更新 更多