【发布时间】:2020-03-05 06:39:27
【问题描述】:
我有一个使用 @KafkaListerner 注释的带有 Kafka 消费者的 Spring 应用程序。正在消费的主题是日志压缩的,我们可能会遇到必须再次消费主题消息的场景。以编程方式实现这一目标的最佳方法是什么?我们不控制 Kafka 主题配置。
【问题讨论】:
我有一个使用 @KafkaListerner 注释的带有 Kafka 消费者的 Spring 应用程序。正在消费的主题是日志压缩的,我们可能会遇到必须再次消费主题消息的场景。以编程方式实现这一目标的最佳方法是什么?我们不控制 Kafka 主题配置。
【问题讨论】:
@KafkaListener(...)
public void listen(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
if (this.resetNeeded) {
consumer.seekToBeginning(consumer.assignment());
this.resetNeeded = false;
}
}
如果您想在侦听器空闲(无记录)时重置,您可以启用空闲事件并通过在 ApplicationListener 或 @EventListener 方法中侦听 ListenerContainerIdleEvent 来执行搜索。
事件有对消费者的引用。
编辑
@SpringBootApplication
public class So58769796Application {
public static void main(String[] args) {
SpringApplication.run(So58769796Application.class, args);
}
@KafkaListener(id = "so58769796", topics = "so58769796")
public void listen1(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("One:" + key + ":" + value);
}
@KafkaListener(id = "so58769796a", topics = "so58769796")
public void listen2(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("Two:" + key + ":" + value);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so58769796")
.compact()
.partitions(1)
.replicas(1)
.build();
}
boolean reset;
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so58769796", "foo", "bar");
System.out.println("Hit enter to rewind");
System.in.read();
this.reset = true;
};
}
@EventListener
public void listen(ListenerContainerIdleEvent event) {
System.out.println(event);
if (this.reset && event.getListenerId().startsWith("so58769796-")) {
event.getConsumer().seekToBeginning(event.getConsumer().assignment());
}
}
}
和
spring.kafka.listener.idle-event-interval=5000
EDIT2
这是另一种技术 - 在这种情况下,我们会在每次应用启动时(并按需)回退...
@SpringBootApplication
public class So58769796Application implements ConsumerSeekAware {
public static void main(String[] args) {
SpringApplication.run(So58769796Application.class, args);
}
@KafkaListener(id = "so58769796", topics = "so58769796")
public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println(key + ":" + value);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so58769796")
.compact()
.partitions(1)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
KafkaListenerEndpointRegistry registry) {
return args -> {
template.send("so58769796", "foo", "bar");
System.out.println("Hit enter to rewind");
System.in.read();
registry.getListenerContainer("so58769796").stop();
registry.getListenerContainer("so58769796").start();
};
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
【讨论】:
ConsumerSeekAware,
ConsumerSeekAware 类中有多个@KafkaListeners,它将适用于它们;你应该把你想倒带的那个放在它自己的类中。