转自: https://www.jianshu.com/p/932663e9a226

 

consumer.subscribe(topicA);
consumer.poll(100);//正常订阅topic和poll消息

Set<TopicPartition> assignments = consumer.assignment();//获取consumer所分配的分区信息
Map<TopicPartition, Long> query = new HashMap<>();//构造offsetsForTimes参数,通过时间戳找到offset
for (TopicPartition topicPartition : assignments) {
    System.out.println(topicPartition);
    query.put(topicPartition, 1550804131000L);
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : result.entrySet()) {
    System.out.println(entry);
    consumer.seek(entry.getKey(), entry.getValue().offset());//每个topic的partition都seek到执行的offset
}
  

 

相关文章:

  • 2022-12-23
  • 2021-11-23
  • 2021-09-27
  • 2021-11-18
  • 2022-02-08
  • 2022-01-19
  • 2021-09-02
  • 2021-12-19
猜你喜欢
  • 2022-12-23
  • 2021-11-18
  • 2021-10-13
  • 2022-02-01
  • 2021-11-18
  • 2021-08-12
  • 2022-12-23
相关资源
相似解决方案