【发布时间】:2021-11-12 02:12:14
【问题描述】:
我正在使用spring-cloud-stream-kafka-binder-3.0.4批量消费消息,消费后将消息转换为对象但出现上述异常。
代码如下:
@StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
public void handleActivity(List<Message<Event>> messages,
@Header(name = "deliveryAttempt", defaultValue = "1") int deliveryAttempt,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)
{
try
{
log.info("Received activity message with message length {} attempt {}",
messages.size(), deliveryAttempt);
List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
nodeConfigActivityBatchProcessor.processNodeConfigActivity(eventList);
acknowledgment.acknowledge();
log.debug("Processed activity message {} successfully!!", messages.size());
}
catch (Exception e)
{
throw e;
}
}
配置:
spring.cloud.stream.bindings.activity-input-channel.destination=TOPIC.FEED.NAME
spring.cloud.stream.bindings.activity-input-channel.contentType=application/json
spring.cloud.stream.bindings.activity-input-channel.consumer.batch-mode=true
spring.cloud.stream.bindings.activity-input-channel.consumer.max-attempts=1
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.auto-commit-offset=false
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.reset-offsets=true
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.start-offset=latest
spring.kafka.consumer.max-poll-records=5
spring.kafka.consumer.fetch-max-wait=60000
spring.kafka.consumer.fetch-min-size=500
我在 .collect(Collectors.toList()) 的这一行 List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList()); 收到上述错误。我不知道为什么??
如果我检查Message<Event> eventMessage = messages.get(0) 得到相同的异常(messages 是 Message 变量的列表)。
如果批处理模式如果为false,那么它只消耗一个消息handleActivity(Message message),那么它工作正常,没有例外。
使用批处理模式时是否需要添加任何反序列化器???
【问题讨论】:
-
该消息意味着您正在尝试将 B 的实例转换为 Message 的实例,但两者之间没有 IS-A 关系,这就是它失败的原因。
-
我将消息发布到 kafka 生产者,并且此侦听器以 5 批消费,但之后我无法从它收到的列表中获取第一个元素(这是消息格式)。同样,如果批处理模式为 false,那么它只消耗一个消息 handleActivity(Message
message),那么它工作正常。什么是B型,我不明白 -
我也不知道 B 是什么,但那是您的错误消息中提到的类型
标签: java spring apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka