【发布时间】:2019-08-16 22:39:56
【问题描述】:
我有一个 kakfa 消费者,其 enable.auto.commit 设置为 false。每当我重新启动消费者应用程序时,它总是会再次读取上次提交的偏移量,然后再读取下一个偏移量。
例如。最后提交的偏移量是 50。当我重新启动消费者时,它再次首先读取偏移量 50,然后再读取下一个偏移量。
我正在执行 commitsync,如下所示。
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("sometopic", partition), new OffsetAndMetadata(offset));
kafkaconsumer.commitSync(offsets);
我尝试将 auto.offset.reset 设置为 earliest 和 latest 但它不会改变行为。
我在消费者配置中遗漏了什么吗?
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "CLIENT_ID");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "GROUP_ID");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,CustomDeserializer.class.getName());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
【问题讨论】:
标签: apache-kafka kafka-consumer-api