【问题标题】:Getting java.lang.ClassCastException: [B cannot be cast to org.springframework.messaging.Message exception after consuming batch获取 java.lang.ClassCastException: [B cannot be cast to org.springframework.messaging.Message 消费批次后异常
【发布时间】:2021-11-12 02:12:14
【问题描述】:

我正在使用spring-cloud-stream-kafka-binder-3.0.4批量消费消息,消费后将消息转换为对象但出现上述异常。

代码如下:

@StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
public void handleActivity(List<Message<Event>> messages,
    @Header(name = "deliveryAttempt", defaultValue = "1") int deliveryAttempt,
    @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)  
{
    try
    {
        log.info("Received activity message with message length {} attempt {}",
              messages.size(), deliveryAttempt);
        List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
        nodeConfigActivityBatchProcessor.processNodeConfigActivity(eventList);
        acknowledgment.acknowledge();
        log.debug("Processed activity message {} successfully!!", messages.size());
    } 
    catch (Exception e)
    { 
        throw e;
    }
}

配置:

spring.cloud.stream.bindings.activity-input-channel.destination=TOPIC.FEED.NAME
spring.cloud.stream.bindings.activity-input-channel.contentType=application/json
spring.cloud.stream.bindings.activity-input-channel.consumer.batch-mode=true
spring.cloud.stream.bindings.activity-input-channel.consumer.max-attempts=1
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.auto-commit-offset=false
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.reset-offsets=true
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.start-offset=latest
spring.kafka.consumer.max-poll-records=5
spring.kafka.consumer.fetch-max-wait=60000
spring.kafka.consumer.fetch-min-size=500

我在 .collect(Collectors.toList()) 的这一行 List&lt;Event&gt; eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList()); 收到上述错误。我不知道为什么??

如果我检查Message&lt;Event&gt; eventMessage = messages.get(0) 得到相同的异常(messages 是 Message 变量的列表)。

如果批处理模式如果为false,那么它只消耗一个消息handleActivity(Message message),那么它工作正常,没有例外。

使用批处理模式时是否需要添加任何反序列化器???

【问题讨论】:

  • 该消息意味着您正在尝试将 B 的实例转换为 Message 的实例,但两者之间没有 IS-A 关系,这就是它失败的原因。
  • 我将消息发布到 kafka 生产者,并且此侦听器以 5 批消费,但之后我无法从它收到的列表中获取第一个元素(这是消息格式)。同样,如果批处理模式为 false,那么它只消耗一个消息 handleActivity(Message message),那么它工作正常。什么是B型,我不明白
  • 我也不知道 B 是什么,但那是您的错误消息中提到的类型

标签: java spring apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka


【解决方案1】:

我设法通过添加反序列化器解决了这个异常。

所以下面是我的批处理监听器,而不是像问题中提到的那样使用List&lt;Message&lt;Event&gt;&gt; messages,而是使用List&lt;Event&gt; messages

@StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
 public void handleActivity(List<Event> messages,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)  
 {
  try
  {
    log.info("Received activity message with message length {} attempt 
    {}",messages.size(), deliveryAttempt);        
   nodeConfigActivityBatchProcessor.processNodeConfigActivity(messages);
    acknowledgment.acknowledge();
    log.debug("Processed activity message {} successfully!!", 
   messages.size());
 } 
 catch (Exception e)
  { 
      throw e;
  }
}

添加了以下反序列化器

public class EventDeserializer extends JsonDeserializer<Event> {
}

在属性文件中添加了以下 value.deserializer 属性。

spring.cloud.stream.kafka.bindings.input-channel.consumer.configuration.value.deserializer=com.sample.messaging.util.EventDeserializer

我在批处理侦听器中获得了事件列表。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-04-09
    • 1970-01-01
    • 2021-01-17
    • 1970-01-01
    • 1970-01-01
    • 2022-12-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多