【问题标题】:Spring Kafka: Read from two different topics in orderSpring Kafka:按顺序阅读两个不同的主题
【发布时间】:2020-05-08 02:52:29
【问题描述】:

Spring Kafka 是否有可能以有保证的顺序从不同消费者的两个不同主题中读取?例如,主题 A 存储与确定如何处理存储在主题 B 中的数据相关的信息。主题 A 被读入内存并被引用,但需要在从主题 B 中读取数据之前完全填充。下面是一个我当前设置的示例...

@Service
public class TopicA {
  @KafkaListener(topics = "topicA")
  public void consume(ConsumerRecord<String, byte[]> record) {
    // ... some code here to populate an in-memory data structure
  }
}
@Service
public class TopicB {
  @KafkaListener(topics = "topicB")
  public void consume(ConsumerRecord<String, byte[]> record) {
    // ... some code here that depends on topic A having populated the in-memory data structure
  }
}

到目前为止,我一直倾向于创建一个 Spring 启动过程(使用 @PostConstruct),该过程首先通过读取主题 A 来初始化数据结构,但无法使其正常工作。有没有人有什么建议?提前致谢!

【问题讨论】:

  • 听起来你应该让主题 A 应该是一个 KTable 并且你想在它和 B 之间加入数据

标签: java apache-kafka spring-kafka


【解决方案1】:
@KafkaListener(id = "bConsumer" topics = "topicB", autoStartup = "false")

然后当你准备好时自动装配KafkaListenerEndpointRegistry bean 和registry.getListenerContainer("bConsumer").start();

【讨论】:

  • 所以KafkaListenerEndpointRegistry 会自动装配到topicA 类中,并且只有在完全填充数据结构后才会启动主题B 消费者?您对我如何判断已完全阅读 Kafka 主题有什么建议吗?我总是使用consumerSeekCallback.seekToBeginning()从头开始阅读。
  • 您可以设置容器的idleEventInterval 并在该时间间隔过后使用ListenerContainerIdleEvent 而没有收到任何新记录。您可以通过实现ApplicationListener 或添加@EventListener 方法来使用该事件。见EventsConsuming Events
猜你喜欢
  • 2017-03-14
  • 1970-01-01
  • 2020-10-17
  • 1970-01-01
  • 1970-01-01
  • 2018-09-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多