【发布时间】:2021-02-20 09:09:03
【问题描述】:
在 Spring Cloud Stream 批处理模式下使用 KafkAvroSerialized 消息时遇到以下错误消息。这之前在非批处理模式下工作正常。
以下是将现有应用程序转换为批处理模式所做的仅有的两个更改 -
-
在 application.yml 中启用属性批处理模式:true 2) 修改 List
> 的参数,以前是 Message Error Message : org.springframework.messaging.MessageException: Exception thrown while invoking MyConsumer#consume[1 args]; nested exception is java.lang.ClassCastException: SomeClass cannot be cast to org.springframework.messaging.Message
请帮助解决问题。
【问题讨论】:
-
我注意到的区别是以前在非批处理模式下,我曾经接收带有 kafka 标头的 GenericMessage,但在批处理模式下,我直接接收没有 Kafka 标头的 Payload
-
我看到@StreamListener 不适用于批处理模式。我指的是这里提到的例子github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/…。但是,我不确定 Spring 是如何知道在多个绑定器从同一应用程序中的多个主题消耗的情况下调用哪个方法的。
-
我可以使用 @StreamListener 接收消息。但我面临的唯一问题是我正在丢失 Kafka 标头。我直接得到有效载荷而不是 GenericMessage 了。
-
我遇到了类似的问题,你的解决方案有效吗? stackoverflow.com/questions/67565589/…
标签: apache-kafka spring-kafka spring-cloud-stream