【问题标题】:How to catch deserialization error in Kafka-Spring?如何在 Kafka-Spring 中捕获反序列化错误?
【发布时间】:2019-09-19 06:57:42
【问题描述】:

我正在启动一个使用 kafka 消息的应用程序。

我关注Spring-docs 关于反序列化错误处理以捕获反序列化异常。我试过 failedDeserializationFunction 方法。

这是我的消费者配置类

@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        
        /*  Error Handling */
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);

        return consumerProps;
    }

    @Bean
    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(NTCMessageBody.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

这是 BiFunction 提供者

public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {

    @Override
    public NTCMessageBody apply(byte[] t, Headers u) {
        return new NTCBadMessageBody(t);
    }

}

public class NTCBadMessageBody extends NTCMessageBody{

    private final byte[] failedDecode;

    public NTCBadMessageBody(byte[] failedDecode) {
        this.failedDecode = failedDecode;
    }

    public byte[] getFailedDecode() {
        return this.failedDecode;
    }

}

当我只发送一条关于该主题的损坏消息时,我收到了这个错误(循环中):

org.apache.kafka.common.errors.SerializationException: 反序列化键/值时出错

我理解 ErrorHandlingDeserializer2 应该委托 NTCBadMessageBody 类型并继续消费。我还看到(在调试模式下)它从未进入 NTCBadMessageBody 类的构造函数中。

【问题讨论】:

    标签: java spring apache-kafka


    【解决方案1】:

    使用 ErrorHandlingDeserializer。

    当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,2.2 版本引入了 ErrorHandlingDeserializer。这个反序列化器委托给一个真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则 ErrorHandlingDeserializer 将返回 DeserializationException,其中包含原因和原始字节。使用记录级 MessageListener 时,如果键或值包含 DeserializationException,则使用失败的 ConsumerRecord 调用容器的 ErrorHandler。使用 BatchMessageListener 时,失败的记录与批处理中的剩余记录一起传递给应用程序,因此应用程序侦听器有责任检查特定记录中的键或值是否为 DeserializationException。

    您可以使用 DefaultKafkaConsumerFactory 构造函数,该构造函数采用键和值 Deserializer 对象并连接到配置有适当委托的适当 ErrorHandlingDeserializer。或者,您可以使用 ErrorHandlingDeserializer 使用的使用者配置属性来实例化委托。属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS;属性值可以是类或类名

    package com.mypackage.app.config;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    import com.mypacakage.app.model.kafka.message.KafkaEvent;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ListenerExecutionFailedException;
    import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
    import org.springframework.kafka.support.serializer.JsonDeserializer;
    import org.springframework.retry.policy.SimpleRetryPolicy;
    import org.springframework.retry.support.RetryTemplate;
    
    import lombok.extern.slf4j.Slf4j;
    
    @EnableKafka
    @Configuration
    @Slf4j
    public class KafkaConsumerConfig {
    
        @Value("${kafka.bootstrap-servers}")
        private String servers;
    
        @Value("${listener.group-id}")
        private String groupId;
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() {
        
            ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
    
            factory.setRetryTemplate(retryTemplate());
            factory.setErrorHandler(((exception, data) -> {
                /*
                 * here you can do you custom handling, I am just logging it same as default
                 * Error handler does If you just want to log. you need not configure the error
                 * handler here. The default handler does it for you. Generally, you will
                 * persist the failed records to DB for tracking the failed records.
                 */
                log.error("Error in process with Exception {} and the record is {}", exception, data);
            }));
    
            return factory;
    
        }
    
        @Bean
        public ConsumerFactory<String, KafkaEvent> consumerFactory() {
            Map<String, Object> config = new HashMap<>();
    
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
            config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
            config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
                    "com.mypackage.app.model.kafka.message.KafkaEvent");
            config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app");
    
            return new DefaultKafkaConsumerFactory<>(config);
        }
    
        private RetryTemplate retryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();
    
            /*
             * here retry policy is used to set the number of attempts to retry and what
             * exceptions you wanted to try and what you don't want to retry.
             */
            retryTemplate.setRetryPolicy(retryPolicy());
    
            return retryTemplate;
        }
    
        private SimpleRetryPolicy retryPolicy() {
            Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
    
            // the boolean value in the map determines whether exception should be retried
            exceptionMap.put(IllegalArgumentException.class, false);
            exceptionMap.put(TimeoutException.class, true);
            exceptionMap.put(ListenerExecutionFailedException.class, true);
    
            return new SimpleRetryPolicy(3, exceptionMap, true);
        }
    }
    

    【讨论】:

      【解决方案2】:

      如果分区名称包含“-”之类的字符,上述答案可能会出现问题。所以,我用正则表达式修改了相同的逻辑。

          import java.util.List;
          import java.util.regex.Matcher;
          import java.util.regex.Pattern;
          
          import org.apache.kafka.clients.consumer.Consumer;
          import org.apache.kafka.clients.consumer.ConsumerRecord;
          import org.apache.kafka.common.TopicPartition;
          import org.apache.kafka.common.errors.SerializationException;
          import org.springframework.kafka.listener.ErrorHandler;
          import org.springframework.kafka.listener.MessageListenerContainer;
          
          import lombok.extern.slf4j.Slf4j;
          
          @Slf4j
          public class KafkaErrHandler implements ErrorHandler {
          
              /**
               * Method prevents serialization error freeze
               * 
               * @param e
               * @param consumer
               */
              private void seekSerializeException(Exception e, Consumer<?, ?> consumer) {
                  String p = ".*partition (.*) at offset ([0-9]*).*";
                  Pattern r = Pattern.compile(p);
          
                  Matcher m = r.matcher(e.getMessage());
          
                  if (m.find()) {
                      int idx = m.group(1).lastIndexOf("-");
                      String topics = m.group(1).substring(0, idx);
                      int partition = Integer.parseInt(m.group(1).substring(idx));
                      int offset = Integer.parseInt(m.group(2));
          
                      TopicPartition topicPartition = new TopicPartition(topics, partition);
          
                      consumer.seek(topicPartition, (offset + 1));
          
                      log.info("Skipped message with offset {} from partition {}", offset, partition);
                  }
              }
          
              @Override
              public void handle(Exception e, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {
                  log.error("Error in process with Exception {} and the record is {}", e, record);
          
                  if (e instanceof SerializationException)
                      seekSerializeException(e, consumer);
              }
          
              @Override
              public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                      MessageListenerContainer container) {
                  log.error("Error in process with Exception {} and the records are {}", e, records);
          
                  if (e instanceof SerializationException)
                      seekSerializeException(e, consumer);
          
              }
          
              @Override
              public void handle(Exception e, ConsumerRecord<?, ?> record) {
                  log.error("Error in process with Exception {} and the record is {}", e, record);
              }
          
          } 
      

      最后使用配置中的错误处理程序。

       @Bean
      public ConcurrentKafkaListenerContainerFactory<String, GenericType> macdStatusListenerFactory() {
      
          ConcurrentKafkaListenerContainerFactory<String, GenericType> factory = new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(macdStatusConsumerFactory());
          factory.setRetryTemplate(retryTemplate());
          factory.setErrorHandler(new KafkaErrHandler());
      
          return factory;
      }
      

      但是不推荐解析错误字符串来获取分区、主题和偏移量。如果有人有更好的解决方案,请在此处发布。

      【讨论】:

      • 您的解决方案比批准的解决方案更好。我将int partition = Integer.parseInt(m.group(1).substring(idx));更改为int partition = Integer.parseInt(m.group(1).substring(idx+1));以避免负数
      【解决方案3】:

      ErrorHandlingDeserializer

      当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。 为了解决这个问题,2.2 版引入了 ErrorHandlingDeserializer . 这个反序列化器委托给一个真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则 ErrorHandlingDeserializer 将返回 DeserializationException,其中包含原因和原始字节。 使用记录级 MessageListener 时,如果键或值包含 DeserializationException,则使用失败的 ConsumerRecord 调用容器的 ErrorHandler。 使用 BatchMessageListener 时,将失败的记录与批处理中的剩余记录,因此应用程序侦听器有责任检查特定记录中的键或值是否为 DeserializationException。

      所以根据您的代码,您使用的是record-level MessageListener,然后只需将ErrorHandler 添加到Container

      Handling Exceptions

      如果您的错误处理程序实现了此接口,例如,您可以相应地调整偏移量。例如,要重置偏移量以重播失败的消息,您可以执行以下操作;但是请注意,这些都是简单的实现,您可能需要更多地检查错误处理程序。

      @Bean
      public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
      return (m, e, c) -> {
          this.listen3Exception = e;
          MessageHeaders headers = m.getHeaders();
          c.seek(new org.apache.kafka.common.TopicPartition(
                  headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
                  headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
                  headers.get(KafkaHeaders.OFFSET, Long.class));
          return null;
         };
      }
      

      或者你可以像这个例子中那样自定义实现

      @Bean
      public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
      kafkaListenerContainerFactory()  {
      
          ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
                  = new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(consumerFactory());
          factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
              @Override
              public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
                  String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                  String topics = s.split("-")[0];
                  int offset = Integer.valueOf(s.split("offset ")[1]);
                  int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
      
                  TopicPartition topicPartition = new TopicPartition(topics, partition);
                  //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
                  consumer.seek(topicPartition, offset + 1);
                  System.out.println("OKKKKK");
              }
      
              @Override
              public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
      
              }
      
              @Override
              public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
                  String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                  String topics = s.split("-")[0];
                  int offset = Integer.valueOf(s.split("offset ")[1]);
                  int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
      
                  TopicPartition topicPartition = new TopicPartition(topics, partition);
                  //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
                  consumer.seek(topicPartition, offset + 1);
                  System.out.println("OKKKKK");
      
      
              }
          });
      
      
          return factory;
      }
      

      【讨论】:

      • 我正在尝试...但您的第二个示例中似乎存在问题,因为我得到了The method setErrorHandler(new ErrorHandler(){}) is undefined for the type ContainerProperties
      • 进行此更正factory.setErrorHandler(new ErrorHandler() 效果很好!非常感谢!
      • 对于任何给定的异常,有没有办法在上面的自定义实现中访问分区信息(实际上是 TopicPartition)?我们想捕获异常并将它们记录到数据库中,然后增加分区上的偏移量。但是通过解析上面的异常消息,我们只能为SerializationException 这样做。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-10
      • 2019-11-18
      • 2017-05-20
      • 2018-01-19
      • 1970-01-01
      相关资源
      最近更新 更多