【发布时间】:2019-04-11 01:27:51
【问题描述】:
我有一个相当直接的 Kafka 消费者:
MessageListener<String, T> messageListener = record -> {
doStuff( record.value()));
};
startConsumer(messageListener);
protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
consumerFactory(this.brokerAddress, this.groupId),
containerProperties(this.topic, messageListener));
container.start();
}
我可以毫无问题地使用消息。
现在,我需要根据 Kafka Consumer 上对 offsetsForTimes 的调用结果,从特定偏移量到 seek。
我知道我可以使用ConsumerSeekAware接口seek到某个位置:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}
现在的问题是,我无法访问回调中的 Kafka Consumer,因此我无法调用 offsetsForTimes。
还有其他方法可以实现吗?
【问题讨论】: