【发布时间】:2018-01-10 06:20:28
【问题描述】:
我正在为我的项目使用 Kafka 版本 0.10.2.1 和 Spring boot。
我有一个主题的 5 个分区,可供在不同机器上运行的多个消费者(具有相同的 Group-Id)使用。
我面临的问题是:
我使用这些 Kafka 警告日志重复读取一条消息
Auto offset commit failed for group my-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
由于日志表明出现此问题是因为 Kafka 消费者未能提交。
以下是关于我的用例的一些细节:
我有多个主题
My-Topic的消费者属于同一组 IDmy-consumer-group消费者消费来自Kafka的消息,应用业务逻辑并将处理后的数据存储在
Cassandra从 Kafka 消费消息、应用业务逻辑然后将其保存到 Cassandra 的过程从 Kafka 消费的每条消息大约需要 10 毫秒。
我正在使用以下代码来创建 Kafka-consumer bean
@Configuration
@EnableKafka
public class KafkaConsumer {
@Value("${spring.kafka.bootstrap-servers}")
private String brokerURL;
@Value("${spring.kafka.session.timeout}")
private int sessionTimeout;
@Value("${spring.kafka.consumer.my-group-id}")
private String groupId;
@Value("${spring.kafka.listener.concurrency}")
private int concurrency;
@Value("${spring.kafka.listener.poll-timeout}")
private int timeout;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(timeout);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
这些是我正在使用的 kafka 配置
spring.kafka.listener.concurrency=2
spring.kafka.listener.poll-timeout=3000
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.session.timeout=50000
spring.kafka.connection.timeout=10000
spring.kafka.topic.partition=5
spring.kafka.message.replication=2
我主要担心属于同一消费者组的多个 Kafka 消费者重复读取消息,在我的应用程序中,我必须避免重复进入数据库。
能否请您帮我检查一下我上面的 Kafka 配置和 Kafka-consumer-code,以避免重复阅读。
【问题讨论】:
标签: java spring apache-kafka kafka-consumer-api spring-kafka