【发布时间】:2019-04-12 12:57:11
【问题描述】:
我在 Kafka 中有一个具有一次性语义的消费-转换-生产应用程序。 (事务性)生产阶段产生关于同一主题的新消息,然后被消费(事务性=read_committed)。只有一个线程执行此操作,并确保在生产者事务提交之后发生消费者轮询。现在,每个消费-转换-生产-轮次我只有一个民意调查声明。
测试用例
当我运行我的测试用例时,有时可能会有其他生产者在我的生产者的事务提交之前(清晰地)发送的消息。然后我经历了以下:
我的单轮询语句只返回这条外来消息,而不是我刚才产生的消息,尽管上一轮的事务已成功提交。
问题
- 我是不是缺少了什么东西,所以我的交易是上次的结果 下一轮的消费者看不到这一轮?
- 我是否必须发出多次轮询,直到一次轮询返回 0 条记录,这告诉我该分区中服务器上的所有消息 读了吗?
- Kafka 是否可以不保证当前分区上的所有消息都被读取?也许没有“我现在已经读完这个分区”之类的东西了?
配置
-
交易型消费者
最终映射 consumerConfig = new LinkedHashMap(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,“最新”); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
事务生产者
final Map producerConfig = new LinkedHashMap(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
我的轮询超时时间是 2 秒
- 我的理解是事务生产者是自动幂等的,acks=all
- 我的测试用例只有一个代理和一个复制。但我当然打算在生产中使用更多
- 我使用的是 Kafka 2.0
- 我的主题只有一个分区
- 我的线程有自己的消费者组并被分配到这个单一的分区
【问题讨论】:
标签: transactions apache-kafka kafka-consumer-api kafka-producer-api