【问题标题】:Kafka duplicate read卡夫卡重复读取
【发布时间】:2018-01-10 06:20:28
【问题描述】:

我正在为我的项目使用 Kafka 版本 0.10.2.1Spring boot

我有一个主题的 5 个分区,可供在不同机器上运行的多个消费者(具有相同的 Group-Id)使用。

我面临的问题是

我使用这些 Kafka 警告日志重复读取一条消息

Auto offset commit failed for group my-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

由于日志表明出现此问题是因为 Kafka 消费者未能提交。

以下是关于我的用例的一些细节:

  • 我有多个主题 My-Topic 的消费者属于同一组 ID my-consumer-group

  • 消费者消费来自Kafka的消息,应用业务逻辑并将处理后的数据存储在Cassandra

  • 从 Kafka 消费消息、应用业务逻辑然后将其保存到 Cassandra 的过程从 Kafka 消费的每条消息大约需要 10 毫秒

我正在使用以下代码来创建 Kafka-consumer bean

@Configuration
@EnableKafka
public class KafkaConsumer {
    @Value("${spring.kafka.bootstrap-servers}")
    private String brokerURL;

    @Value("${spring.kafka.session.timeout}")
    private int sessionTimeout;

    @Value("${spring.kafka.consumer.my-group-id}")
    private String groupId;

    @Value("${spring.kafka.listener.concurrency}")
    private int concurrency;

    @Value("${spring.kafka.listener.poll-timeout}")
    private int timeout;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(timeout);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
} 

这些是我正在使用的 kafka 配置

spring.kafka.listener.concurrency=2
spring.kafka.listener.poll-timeout=3000
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.session.timeout=50000
spring.kafka.connection.timeout=10000
spring.kafka.topic.partition=5
spring.kafka.message.replication=2

我主要担心属于同一消费者组的多个 Kafka 消费者重复读取消息,在我的应用程序中,我必须避免重复进入数据库。

能否请您帮我检查一下我上面的 Kafka 配置和 Kafka-consumer-code,以避免重复阅读。

【问题讨论】:

    标签: java spring apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    简单的答案是不要使用autoCommit - 它会按计划提交。

    相反,让容器进行提交;使用AckModeRECORD

    但是,您仍然应该使您的代码具有幂等性 - 始终存在重新交付的可能性;只是提交策略越可靠,概率越小。

    【讨论】:

    • 问题是我在 Cassandra 中有一个计数器列,它根据 Kafka 消费者收到的消息而增加。如果发生重复读取,它将多次增加计数器,这将导致错误的分析。
    • 欢迎来到消息传递的世界。对于您的情况,不可能实现“仅一次”交付(如果您不相信我,请在谷歌上搜索)。正如我所说,您可以最小化但不能消除重复交付的可能性。考虑在你提交 kafka 偏移量之前更新 mongodb 然后服务器崩溃的情况;结果 - 重新交付。如果很关键,你必须先检查 mongo 看看你是否已经存储了这个事件。
    • 谢谢@Gary。你的回答真的帮助了我。如果“恰好一次”交付是不可能的,那么银行和关键任务系统如何工作我知道他们使用 RDBMS,但他们使用什么消息传递工具
    猜你喜欢
    • 1970-01-01
    • 2017-01-16
    • 2016-03-06
    • 1970-01-01
    • 2016-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多