【发布时间】:2016-10-03 15:54:05
【问题描述】:
我正在使用 spring-integration-kafka 使用消息,使用 message-driven-channel-adapter:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
channel="outputFromKafka"
error-channel="errorChannel"/>
容器使用JsonDeserializer 将传入的 JSON 反序列化为对象:
<beans:bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<beans:constructor-arg>
<beans:bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<beans:constructor-arg>
<beans:map>
<beans:entry key="bootstrap.servers" value="localhost:9092" />
<beans:entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
<beans:entry key="group.id" value="mygroup" />
</beans:map>
</beans:constructor-arg>
<beans:property name="valueDeserializer">
<beans:bean class="org.springframework.kafka.support.serializer.JsonDeserializer">
<beans:constructor-arg value="com.foo.MyType"/>
</beans:bean>
</beans:property>
</beans:bean>
</beans:constructor-arg>
<beans:constructor-arg>
<beans:bean class="org.springframework.kafka.listener.config.ContainerProperties">
<beans:constructor-arg name="topics" value="foo" />
</beans:bean>
</beans:constructor-arg>
</beans:bean>
如果无法成功解析消息(例如因为消费者不小心使用了错误的类型),则抛出异常:
ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion: org.apache.kafka.common.errors.SerializationException: Can't deserialize data ...
在此之后,适配器再次收到相同的消息(可能是因为最后一个消息没有提交?),并且以完全相同的方式失败,导致无穷无尽的异常流。
看起来配置的error-channel 没有使用。
处理此类错误的选项有哪些,在 XML 中是如何配置的?
【问题讨论】:
标签: spring-integration apache-kafka