【问题标题】:Kafka Spring Cloud Stream Batch Consumption - Unable to cast class to MessageKafka Spring Cloud Stream批量消费 - 无法将类转换为消息
【发布时间】:2021-02-20 09:09:03
【问题描述】:

在 Spring Cloud Stream 批处理模式下使用 KafkAvroSerialized 消息时遇到以下错误消息。这之前在非批处理模式下工作正常。

以下是将现有应用程序转换为批处理模式所做的仅有的两个更改 -

  1. 在 application.yml 中启用属性批处理模式:true 2) 修改 List> 的参数,以前是 Message

    Error Message : org.springframework.messaging.MessageException: Exception thrown while invoking MyConsumer#consume[1 args]; nested exception is java.lang.ClassCastException: SomeClass cannot be cast to org.springframework.messaging.Message

请帮助解决问题。

【问题讨论】:

  • 我注意到的区别是以前在非批处理模式下,我曾经接收带有 kafka 标头的 GenericMessage,但在批处理模式下,我直接接收没有 Kafka 标头的 Payload
  • 我看到@StreamListener 不适用于批处理模式。我指的是这里提到的例子github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/…。但是,我不确定 Spring 是如何知道在多个绑定器从同一应用程序中的多个主题消耗的情况下调用哪个方法的。
  • 我可以使用 @StreamListener 接收消息。但我面临的唯一问题是我正在丢失 Kafka 标头。我直接得到有效载荷而不是 GenericMessage 了。
  • 我遇到了类似的问题,你的解决方案有效吗? stackoverflow.com/questions/67565589/…

标签: apache-kafka spring-kafka spring-cloud-stream


【解决方案1】:

我已经设法以批处理模式获取标题,但它们被包装在列表中。

如果你需要访问元数据和其他头文件,你不能使用@StreamListener,你需要使用函数式编程方法编写你的消费者函数,例如:

@Bean
Consumer<Message<List<String>>> input() {
    return list -> {
        System.out.println(list);
    };
}

并添加spring.cloud.function.definition=input(你的函数名),同样设置绑定到input(你的函数名)-in-0 比如

spring:
  profiles: dev
  cloud:
    stream:
      function:
        definition: input
      bindings:
        input-in-0:
          destination: yourDestination
          group: yourGroup

参考:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names

https://cloud.spring.io/spring-cloud-stream/multi/multi__programming_model.html

【讨论】:

    【解决方案2】:

    我也能够达到可以使以下块正常工作的地步,

    @Bean
    Consumer<Message<List<Foo>>> consumer() {
        return  message -> {
            log.info("Message={}", message);
            message.getPayload().forEach(event -> {
                log.info("event = {}", event);
            });
            message.getHeaders().entrySet().forEach(header -> {
                log.info("header = {}", header);
            });
        };
    }
    

    但是,我最终拥有的是两个列表,一个 List 和另一个 List,有没有办法可靠地将 Foo - Headers 配对?

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-08-29
      • 1970-01-01
      • 2018-03-09
      • 2016-06-21
      • 2017-06-22
      • 1970-01-01
      • 2021-06-12
      • 1970-01-01
      相关资源
      最近更新 更多