【发布时间】:2017-11-25 19:26:54
【问题描述】:
我有一个 kafka 消费者,它连接到具有 3 个分区的主题。一旦我从 kafka 获得记录,我想捕获偏移量和分区。重新启动时,我想从上次读取的偏移量恢复消费者的位置
来自 kafka 文档:
每条记录都有自己的偏移量,因此要管理自己的偏移量,您只需执行以下操作:
配置 enable.auto.commit=false
使用每个 ConsumerRecord 提供的偏移量来保存您的 位置。
重启时使用 seek 恢复消费者的位置 (TopicPartition,长)。
这是我的示例代码:
constructor{
load data into offsetMap<partition,offset>
initFlag=true;
}
Main method
{
ConsumerRecords<String, String> records = consumer.poll(100);
if(initFlag) // is this correct way to override offset position?
{
seekToPositions(offsetMap);
initFlag=false;
}
while(!shutdown)
{
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
getOffsetPositions();// dump offsets and partitions to db/disk
}
}
}
//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
//code to put partition and offset into map
//write to disk or db
}
} // Overrides the fetch offsets that the consumer
public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
//code get partitions and offset from offsetMap
consumer.seek(partition, offset);
}
这是正确的做法吗?有没有更好的办法?
【问题讨论】:
标签: java apache-kafka