【问题标题】:Spring Kafka - access offsetsForTimes to start consuming from specific offsetSpring Kafka - 访问 offsetsForTimes 以从特定偏移量开始消费
【发布时间】:2019-04-11 01:27:51
【问题描述】:

我有一个相当直接的 Kafka 消费者:

MessageListener<String, T> messageListener = record -> {

    doStuff( record.value()));
  };

  startConsumer(messageListener);


protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
    consumerFactory(this.brokerAddress, this.groupId),
    containerProperties(this.topic, messageListener));

   container.start();
}

我可以毫无问题地使用消息。 现在,我需要根据 Kafka Consumer 上对 offsetsForTimes 的调用结果,从特定偏移量到 seek

我知道我可以使用ConsumerSeekAware接口seek到某个位置:

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
        ConsumerSeekCallback callback) {

      assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}

现在的问题是,我无法访问回调中的 Kafka Consumer,因此我无法调用 offsetsForTimes

还有其他方法可以实现吗?

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    使用ConsumerAwareRebalanceListener 进行初始搜索(在 2.0 中引入)。

    当前版本是 2.2.0。

    How to test a ConsumerAwareRebalanceListener?

    【讨论】:

    • 我添加了指向另一个答案的链接。
    • 嗨加里,我正在使用 apache Kafka 消费者,我的 offsetfortimes 方法只返回几个分区偏移位置,尽管所有分区都已分配。我检查 ConsumerRebalanceListner.onPartitionsAssigned 方法接收所有分区但是当我为onPartitionsAssigned 方法内的所有分区它只返回 1 或 2 个分区偏移位置。你对 pblm 有什么想法吗?我也对此发表了一个问题。
    • 在cmets中最好不要问新问题,但我found your other question。以后如果您对旧答案发表评论,请添加指向新问题的链接。
    • 谢谢加里。将来肯定会这样做。
    猜你喜欢
    • 2020-05-17
    • 2016-10-23
    • 1970-01-01
    • 2016-11-20
    • 2019-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多