【发布时间】:2019-11-21 02:40:54
【问题描述】:
我们正在使用 Spring Kafka 批量消费记录。我们有时会遇到应用程序启动的问题,即使有足够的未读消息,它也不会消耗任何记录。相反,我们不断地看到信息日志说。
[INFO]-[FetchSessionHandler:handleError:440] - [Consumer clientId=consumer-2, groupId=groupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1027: org.apache.kafka.common.errors.DisconnectException.
人们正面临这个问题,每个人都说忽略它,因为它只是一个信息日志。甚至,我们看到应用程序在没有做任何事情的情况下开始获取记录。但是,开始消费记录可能需要多长时间是非常不可预测的:(
我们在使用 Spring Cloud Stream 时没有看到这个错误。不确定我们是否遗漏了 spring-kafka 中的任何配置。
过去有人遇到过这个问题,如果我们遗漏了什么,请告诉我们。 我们的主题负载很大,如果有很多滞后,会发生这种情况吗?
我们正在使用 2.2.2.RELEASE 的 Spring Kafka Spring boot 2.1.2.RELEASE Kafka 0.10.0.1(我们知道它已经很老了,因为不可避免的原因我们不得不使用它:()
这是我们的代码:
application.yml
li.topics: CUSTOM.TOPIC.JSON
spring:
application:
name: DataPublisher
kafka:
listener:
type: batch
ack-mode: manual_immediate
consumer:
enable-auto-commit: false
max-poll-records: 500
fetch-min-size: 1
fetch-max-wait: 1000
group-id: group-dev-02
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:CustomResourceDeserialiser
auto-offset-reset: earliest
消费者:
public class CustomKafkaBatchConsumer {
@KafkaListener(topics = "#{'${li.topics}'.split(',')}", id = "${spring.kafka.consumer.group-id}")
public void receiveData(@Payload List<CustomResource> customResources,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
}
}
反序列化器:
public class CustomResourceDeserialiser implements Deserializer<CustomResource> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public CustomResource deserialize(String topic, byte[] data) {
if (data != null) {
try {
ObjectMapper objectMapper = ObjectMapperFactory.getInstance();
return objectMapper.readValue(data, CustomResource.class);
} catch (IOException e) {
log.error("Failed to deserialise with {}",e.getMessage());
}
}
return null;
}
@Override
public void close() {
}
}
【问题讨论】:
-
我没有答案,但是...
>We didn't see this error when we were using Spring cloud stream.这是一个红鲱鱼 - Spring Cloud Stream 在下面使用 Spring Kafka。 -
感谢@GaryRussell 可能是我们在尝试使用 Spring 云流时尝试减少延迟,但没有遇到问题。感谢您的快速回复。
标签: apache-kafka spring-cloud apache-kafka-streams spring-kafka spring-cloud-stream