【问题标题】:Kafka: Polling after producer transaction doesn't get the produced messagesKafka:生产者事务后轮询没有得到产生的消息
【发布时间】:2019-04-12 12:57:11
【问题描述】:

我在 Kafka 中有一个具有一次性语义的消费-转换-生产应用程序。 (事务性)生产阶段产生关于同一主题的新消息,然后被消费(事务性=read_committed)。只有一个线程执行此操作,并确保在生产者事务提交之后发生消费者轮询。现在,每个消费-转换-生产-轮次我只有一个民意调查声明。

测试用例

当我运行我的测试用例时,有时可能会有其他生产者在我的生产者的事务提交之前(清晰地)发送的消息。然后我经历了以下:

我的单轮询语句只返回这条外来消息,而不是我刚才产生的消息,尽管上一轮的事务已成功提交。

问题

  1. 我是不是缺少了什么东西,所以我的交易是上次的结果 下一轮的消费者看不到这一轮?
  2. 我是否必须发出多次轮询,直到一次轮询返回 0 条记录,这告诉我该分区中服务器上的所有消息 读了吗?
  3. Kafka 是否可以不保证当前分区上的所有消息都被读取?也许没有“我现在已经读完这个分区”之类的东西了?

配置

  • 交易型消费者

    最终映射 consumerConfig = new LinkedHashMap(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,“最新”); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

  • 事务生产者

    final Map producerConfig = new LinkedHashMap(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

  • 我的轮询超时时间是 2 秒

  • 我的理解是事务生产者是自动幂等的,acks=all
  • 我的测试用例只有一个代理和一个复制。但我当然打算在生产中使用更多
  • 我使用的是 Kafka 2.0
  • 我的主题只有一个分区
  • 我的线程有自己的消费者组并被分配到这个单一的分区

【问题讨论】:

    标签: transactions apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    为了让您了解 poll 的工作原理,我们传递给 poll() 的参数是一个超时间隔,用于控制如果消费者缓冲区中没有数据,poll() 将阻塞多长时间。如果设置为 0,poll() 将立即返回;否则,它将等待指定的毫秒数让数据从代理到达。因此,如果您将轮询配置为 0 毫秒并且数据缓冲区中没有数据,您将不会收到任何数据。

    至于您没有收到最近生成的数据,这取决于您的生产者配置。除非生成的消息没有副本并且基于 acks 参数,否则消息将可供消费者使用。

    例如:如果您将副本设置为 3 并且 acks=all,除非所有复制者都确认领导者已收到消息,否则该消息将无法供消费者使用。

    来到这个问题,你怎么知道你是否已经阅读了整个分区,如果你的投票不再给你任何记录(假设其余的都工作正常)那么它表明你已经消费了该主题的所有消息.

    【讨论】:

    • 谢谢。我编辑了我的问题,以便更清楚地表明我不确定我是否已经在制作人方面做了我能做的一切。您的评论回答了我的问题 2 和 3。您能否检查我的生产者配置是否确保不会发生这种情况:“除非所有复制器都承认领导者他们已收到消息,否则该消息将无法供消费者使用。”
    • 我添加了一个循环轮询,直到它返回空并且它像一个魅力一样工作。非常感谢!
    猜你喜欢
    • 2019-01-15
    • 2019-10-03
    • 1970-01-01
    • 2018-02-08
    • 2017-01-04
    • 2021-03-14
    • 1970-01-01
    • 2020-02-09
    • 1970-01-01
    相关资源
    最近更新 更多