【发布时间】:2021-07-11 21:47:33
【问题描述】:
- spring-boot 2.5.2
- spring-cloud Hoxton.SR12
- spring-kafka 2.6.7(因问题降级:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1079)
我正在按照这个方法处理反序列化错误:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/recipes/recipe-3-handling-deserialization-errors-dlq-kafka.adoc
我将上面配方中提到的 bean 创建为:
Configuration
@Slf4j
public class ErrorHandlingConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setErrorHandler(errorHandler);
};
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
}
配置文件:
spring:
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeDecoding: true
bindings:
myInboundRoute:
destination: some-destination.1
group: a-custom-group
myOutboundRoute:
destination: some-destination.2
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
configuration:
application:
security: PLAINTEXT
bindings:
myInboundRoute:
consumer:
autoCommitOffset: true
startOffset: latest
enableDlq: true
dlqName: my-dql.poison
dlqProducerProperties:
configuration:
value.serializer: myapp.serde.MyCustomSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: myapp.serde.MyCustomSerializer
myOutboundRoute:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: myapp.serde.MyCustomSerializer
我期待 DLT 被称为 my-dql.poison。这个主题实际上创建得很好,但是我还自动创建了第二个主题,名为some-destination.1.DLT
为什么它会创建这个以及我在配置中使用dlqName 命名的那个?
我做错了什么?当我轮询消息时,消息在自动创建的 some-destination.1.DLT 中,而不是我的 dlqName
【问题讨论】:
标签: spring-kafka spring-cloud-stream spring-cloud-stream-binder-kafka