【问题标题】:How to properly handle errors while consuming messages?消费消息时如何正确处理错误?
【发布时间】:2016-10-03 15:54:05
【问题描述】:

我正在使用 spring-integration-kafka 使用消息,使用 message-driven-channel-adapter:

<int-kafka:message-driven-channel-adapter  
    id="kafkaListener"
    listener-container="container1"
    channel="outputFromKafka"
    error-channel="errorChannel"/>

容器使用JsonDeserializer 将传入的 JSON 反序列化为对象:

<beans:bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <beans:constructor-arg>
        <beans:bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <beans:constructor-arg>
                <beans:map>
                    <beans:entry key="bootstrap.servers" value="localhost:9092" />
                    <beans:entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
                    <beans:entry key="group.id" value="mygroup" />
                </beans:map>
            </beans:constructor-arg>
           <beans:property name="valueDeserializer">
                <beans:bean class="org.springframework.kafka.support.serializer.JsonDeserializer">
                    <beans:constructor-arg value="com.foo.MyType"/>
                </beans:bean>
            </beans:property>
        </beans:bean>
    </beans:constructor-arg>
    <beans:constructor-arg>
        <beans:bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <beans:constructor-arg name="topics" value="foo" />
        </beans:bean>
    </beans:constructor-arg>
</beans:bean>

如果无法成功解析消息(例如因为消费者不小心使用了错误的类型),则抛出异常:

ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion: org.apache.kafka.common.errors.SerializationException: Can't deserialize data ...

在此之后,适配器再次收到相同的消息(可能是因为最后一个消息没有提交?),并且以完全相同的方式失败,导致无穷无尽的异常流。

看起来配置的error-channel 没有使用。

处理此类错误的选项有哪些,在 XML 中是如何配置的?

【问题讨论】:

    标签: spring-integration apache-kafka


    【解决方案1】:

    看起来配置的错误通道没有使用。

    是什么让你相信?

    我刚刚运行了一个测试,错误通道设置为errorChannel(默认设置,带有日志适配器)...

    12:06:08.366 [container-kafka-consumer-1] ERROR o.s.i.handler.LoggingHandler - ...
    

    您能否为org.springframework.integration 提供调试日志sn-p 显示失败?

    编辑

    哦,对不起……

    ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion: org.apache.kafka.common.errors.SerializationException: Can't deserialize data ...
    

    在 Spring Integration 收到消息之前,这在堆栈中要低得多。

    【讨论】:

    • 查看我的编辑 - 您需要自定义反序列化器以捕获异常并返回指示反序列化器失败的内容。即一些“特殊”MyType 实例。
    • 你是对的; org.apache.kafka.common.serialization.Deserializer 抛出的异常被 Kafka 客户端代码简单地捕获和记录,没有任何干预的可能性。使用反序列化类型的魔法实例在技术上是可行的,但违反了我的许多原则;)
    • 也许使用 Kafka 值反序列化器对于 JSON 来说不是一个好的选择。相反,我认为,StringDeserializer 和单独的 json 到对象转换器提供了更大的灵活性和稳健性。
    • 是的,然后errorChannel会得到异常。
    猜你喜欢
    • 2021-05-01
    • 2016-05-04
    • 1970-01-01
    • 2020-11-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-27
    • 1970-01-01
    相关资源
    最近更新 更多