【问题标题】:Kafka: Multiple instances in the same consumer group listening to the same partition inside for topicKafka:同一消费者组中的多个实例侦听主题内部的同一分区
【发布时间】:2019-02-18 08:21:24
【问题描述】:

我有两个 kafka 消费者实例,配置了同一个消费者组,并在同一个主题中监听分区 0。问题是当我向主题发送消息时。消息被两个实例使用,因为它们在同一个组中,所以它们不应该发生。 我正在使用 Spring Boot 配置类来配置它们。

这里是配置:

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return props;
}

这里是监听器:

@KafkaListener(topicPartitions = {@TopicPartition(topic = "${kafka.topic.orders}", partitions = "0")})
public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {

    log.info("message received at " + orderTopic + "at partition 0");
    processRecord(record, acknowledgment);
}

【问题讨论】:

    标签: spring-boot apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    卡夫卡不是那样工作的;当您手动分配这样的分区 (@TopicPartition) 时,您明确告诉 Kafka 您希望从该分区接收消息 - 消费者 assign() 将分区分配给自己。

    换句话说,通过手动分配,您负责分配分区。

    您需要使用组管理,并让 Kafka 将主题分配给实例。

    使用topics = "...",Kafka 将完成任务。如果您没有足够的主题,实例将处于空闲状态。您需要至少与实例一样多的分区才能让所有实例都参与。

    【讨论】:

    猜你喜欢
    • 2017-12-23
    • 1970-01-01
    • 2020-09-19
    • 2021-05-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-07
    相关资源
    最近更新 更多