allenwas3

kafka 消息回溯

指定 offset 的 api

KafkaConsumer#seek
KafkaConsumer#seekToBeginning
KafkaConsumer#seekToEnd

对应

assignedState(tp).seek(offset);

assignedState(partition).reset(offsetResetStrategy);

assignedState(partition).reset(offsetResetStrategy);

首先检查当前消费者是否分配到分区,然后发送请求

// org.apache.kafka.clients.consumer.internals.SubscriptionState#assignedState    
private TopicPartitionState assignedState(TopicPartition tp) {
    TopicPartitionState state = this.assignment.stateValue(tp);
    if (state == null)
        throw new IllegalStateException("No current assignment for partition " + tp);
    return state;
}

KafkaConsumer#seek 和 KafkaConsumer#offsetsForTimes 和结合使用

所以,kafka 的消息回溯,需要给消费者发送指令,让消费者调用 seek 或 seekToBeginning 或 seekToEnd。

分类:

技术点:

相关文章:

  • 2021-11-18
  • 2021-11-28
  • 2021-12-18
  • 2021-11-18
  • 2021-08-08
  • 2021-11-18
  • 2021-11-18
猜你喜欢
  • 2021-11-30
  • 2021-05-01
  • 2021-11-18
  • 2021-11-03
  • 2021-11-18
  • 2021-11-18
  • 2021-11-18
相关资源
相似解决方案