【问题标题】:Kafka consumer - assign and seek卡夫卡消费者 - 分配和寻找
【发布时间】:2022-02-11 17:06:00
【问题描述】:

我无法理解这种 API 设计!

在下面的代码中,我们订阅了具有动态分配分区的主题列表。这完全没问题。

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Arrays.asList("some-topic"));

    while(true){

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        StreamSupport.stream(records.spliterator(), false)
                    .forEach(r -> {
                        System.out.println(r.key() + "::" + r.value());
                    });


    }

混乱来了。

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    //seek for specific partition
    TopicPartition partition = new TopicPartition("some-topic", 0);
    consumer.assign(Arrays.asList(partition));
    consumer.seek(partition, 0);

    while(true){

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        StreamSupport.stream(records.spliterator(), false)
                .forEach(r -> {
                    System.out.println(r.key() + "::" + r.value());
                });


    }

问题:

  1. 我们已经使用assign 方法分配了分区列表。为什么seek 方法还要查找分区信息?不知何故,我觉得它是多余的
  2. seek 方法具有带有主题和偏移量的分区。为什么需要先assign 才能调用seek

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    1 - 首先请记住,您的消费者可以通过 kafka API 分配到许多不同的主题/分区。 然后,seek 和 assign 有两个不同的独立职责,这就是为什么你可能认为它是多余的,但是当你需要返回一个偏移量,或者出于任何原因你需要重做一个偏移量时,你会使用 seek,为此, seek() 需要主题和分区信息,您使用静态分配 (assign) 或动态 (subscribe)。

    你不能只使用 seek() 而不指定主题)/分区,在很多情况下会模棱两可。

    2 - 你确定在调用 seek() 之前需要先赋值吗?我知道两者都可以在调用 poll() 之前使用..,但不知道在 seek() 之前分配是强制性的..,你有错误消息吗(我明天可能会检查并编辑这篇文章)

    亚尼克

    【讨论】:

    • 1) 可能是consumer.seek(offset)。为什么还要分区?至少这可以 2) 分配是强制性的 - java.lang.IllegalStateException: No current assignment for partition &lt;topic&gt; - 似乎没有任何意义!
    • 它不可能是 seek(offset) 仅仅因为你怎么知道你想为哪个主题或分区改变你的偏移位置?您的消费者可以绑定到多个主题/分区。至于为什么之前调用 assign(),我认为这是因为您需要 Kafka 集群知道您绑定的主题/分区,以便维护您的 consumer_offsets 的内部状态,例如
    【解决方案2】:

    简短的回答 - 不必总是在“assign”之后调用“seek”。

    长答案-

    consumer.subscribe(Arrays.asList("some-topic")); and 
    consumer.assign(Arrays.asList(partition));
    

    做类似的工作,除了一个细节 -

    “订阅”将主题分配给属于消费者组的消费者。这会处理任何未来的重新平衡,即如果将新分区添加到主题中,消费者组将自动调整其消费者以从新分区消费。

    “assign”赋予调用者代码更多的权力和责任。它不会进行任何重新平衡,调用者负责提交偏移量。 仅当您希望从特定偏移量开始消费时才调用“seek”。在这种情况下,它的偏移量为“0”,这与初始分配相同,因此使其冗余。 seek 用于转到特定的偏移量

    【讨论】:

      猜你喜欢
      • 2018-09-18
      • 2019-07-03
      • 2018-05-05
      • 2021-08-22
      • 1970-01-01
      • 1970-01-01
      • 2022-08-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多