【问题标题】:Can a Kafka consumer commit an offset in a seperate thread?Kafka 消费者可以在单独的线程中提交偏移量吗?
【发布时间】:2018-09-27 23:34:50
【问题描述】:

Kafka 是否允许一个线程或进程使用分区中的数据,而另一个线程或进程负责在数据处理完毕后手动提交偏移量?

【问题讨论】:

  • 你能提供一个代码来说明你在哪里尝试这个吗?
  • 抱歉,我还没有代码,我正在使用复杂的现有架构。但从概念上讲,这是我们想要做的。让一个流从一个主题/分区中读取一堆记录。将读取的记录传递给应用程序的其他部分,可能使用跨共驻进程的共享内存。他们完成分析,并在完成后提交从阅读器传达的偏移量。

标签: apache-kafka offset manual


【解决方案1】:

是的,我相信这是可能的。如上所述,KafkaConsumer 对象不是线程安全的,因此每个线程都应该有自己的实例。两个实例应该具有相同的组 id,并且自动提交当然应该被禁用。有一些提交方法将特定的分区和偏移量作为参数: https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-java.util.Map-https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync-java.util.Map-org.apache.kafka.clients.consumer.OffsetCommitCallback-

但是,我认为在通过订阅方法(旧的高级消费者风格使用)使用自动组管理时,您可能无法做到这一点,而是您必须管理分区使用分配方法手动分配(就像旧的简单消费者一样)。但是你可以试试前者,看看是否也可以。

【讨论】:

  • 抱歉迟到了。是的,我说的是手动分区分配。
【解决方案2】:

Direct from the KafkaConsumer documentation:

Kafka 消费者不是线程安全的。所有网络 I/O 都发生在 进行调用的应用程序的线程。

...

此规则的唯一例外是wakeup(),它可以安全地从外部线程中使用来中断活动操作。

所以,不建议在一个线程之外使用消费者,超出wakeup 异常。

【讨论】:

  • 谢谢。实际上我应该澄清用例。这两个线程正在协调,但生活在两个单独的进程中。一个线程正在阅读一个 kafka 主题,当它完成阅读时,它会将内容发布到共享内存中,以供另一个进程分析并决定何时完成。因此,编写这两个进程是为了协调,这样只有一个进程读取和发布数据,而另一个进程控制提交并使用其消耗的数据执行其他特定于应用程序的工作。
  • 听起来好像都使用同一个消费者对象?
猜你喜欢
  • 1970-01-01
  • 2022-11-13
  • 2022-11-11
  • 2018-03-04
  • 2021-11-10
  • 2017-08-22
  • 1970-01-01
  • 1970-01-01
  • 2021-10-30
相关资源
最近更新 更多