【问题标题】:DLT not being created when using SeekToCurrentErrorHandler and DeadLetterPublishingRecoverer for de-serialization failures使用 SeekToCurrentErrorHandler 和 DeadLetterPublishingRecoverer 进行反序列化失败时未创建 DLT
【发布时间】:2019-07-29 22:02:22
【问题描述】:

这是我的第一个 Spring Boot、Kafka 项目和我的第一个 Stack Overflow 帖子。

我正在使用 Spring Boot 2.1.1 和 spring-kafka 2.2.7.RELEASE。我正在尝试使用 DeadLetterPublishingRecoverer 配置 Spring SeekToCurrentErrorHandler 以发送 de-序列化失败消息到不同的主题。未创建新的 DLT 队列。

虽然我能够在应用程序日志/IDE 控制台中看到由于反序列化失败而导致的错误消息(并在手动输入主题时处理后续消息),但未创建“originalTopic.DLT”主题因此不正确的消息不会写入 .DLT 主题。我在 Spring 文档中读到“默认情况下,死信记录被发送到名为 originalTopic.DLT 的主题(原始主题名称以 .DLT 为后缀)并发送到与原始记录相同的分区”

相反,我在日志文件 (.log) 中看到失败消息以及 @KafkaListner 注释中列出的主题的有效消息。

我正在尝试将错误消息按原样写入 .DLT 主题以进行进一步的错误处理。

这是我目前的配置。任何关于我哪里出错的方向都会非常有帮助。

我参考了以下链接https://docs.spring.io/spring-kafka/reference/html/#serdesConfiguring Spring Kafka to use DeadLetterPublishingRecovererSeekToCurrentErrorHandler: DeadLetterPublishingRecoverer is not handling deserialize errors 来找出解决方案。但我面临的问题是 .DLT 没有被创建。

@EnableKafka
@Configuration
@ConditionalOnMissingBean(type = "org.springframework.kafka.core.KafkaTemplate")
public class SubscriberConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "java.lang.String");
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.sample.main.entity.Transaction");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Transaction> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(Transaction.class, false));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<String, Transaction> factory = new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate),3));
      return factory;

    @KafkaListener(topics = "${spring.kafka.subscription.topic}", groupId="json")
    public void consume(@Payload Transaction message, @Headers MessageHeaders headers) {
    //Business Logic...... 
    this.sendMsgToNewTopic(newTopicName, transformedTrans);

}
}
}

Console output is 2019-07-29 15:28:03 ERROR LoggingErrorHandler:37 - Error while processing: ConsumerRecord(topic = trisyntrans, partition = 0, offset = 10, CreateTime = 1564432082456, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = this is failed deserialization)
org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'this': was expecting 'null', 'true', 'false' or NaN
 at [Source: (String)"this is failed deserialization"; line: 1, column: 5]
    at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:128)
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:132)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:264)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:74)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1275)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1258)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1219)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1200)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1120)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:935)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:751)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'this': was expecting 'null', 'true', 'false' or NaN
 at [Source: (String)"this is failed deserialization"; line: 1, column: 5]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2839)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2817)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._matchToken(ReaderBasedJsonParser.java:2606)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._matchTrue(ReaderBasedJsonParser.java:2558)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:717)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3042)
    at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:125)
    ... 15 more

不合格消息的示例可以是一个简单的字符串,例如“这是一条测试消息”

【问题讨论】:

  • 您是否设法在 .DLT 中获取原始有效负载(原样)?
  • 不,我做不到。

标签: spring-kafka


【解决方案1】:

您必须自己创建 DLT 主题。

如果您将 bean 添加到应用程序上下文中,框架将为您完成这项工作

@Bean
public NewTopic dlt(@Value("${spring.kafka.subscription.topic}" String mainTopic) {
    return new NewTopic(mainTopic + ".DLT", 10, (short) 3);
}

只要在应用程序上下文中有一个KafkaAdmin@Bean(如果您使用的是 Spring Boot,则会为您自动配置一个)。

【讨论】:

  • 谢谢加里。尽管添加了建议的 sn-p 行为并没有改变。 .DLT 仍未创建。它是否与 DeadLetterPublishingRecoverer 中使用的 @ConditionalOnMissingBean(type = "org.springframework.kafka.core.KafkaTemplate") 和 KafkaTemplate kafkaTemplate 无法创建 producerFactory 有任何关系。
  • 否;那是不相关的;该主题由KafkaAdmin bean 声明。它会查找所有 NewTopic @Beans 并自动配置它们。
  • 嗨,加里,我刚刚发现我的 DLT 的创建与您在上面的答案中描述的完全一样。即Spring Boot 如果我自己没有专门创建 DLT,将为我自动配置 1。唯一的“问题”是Spring Boot 将创建只有 1 个副本的 DLT,无论我用于非 DLT 主题的副本数量如何。我想这是设计使然吧?谢谢。
  • 不要对旧答案提出新问题。您可以在 NewTopic @Bean 中指定所需的副本数。如果没有 bean,“Spring Boot”将不会创建主题,但是。您的代理可能配置为自动创建主题(具有代理上定义的属性)。
猜你喜欢
  • 1970-01-01
  • 2020-11-16
  • 1970-01-01
  • 2017-08-22
  • 1970-01-01
  • 2012-12-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多