【问题标题】:Spring Kafka Streams - Create Stream vs Inject an existing Stream beanSpring Kafka Streams - 创建 Stream 与注入现有 Stream bean
【发布时间】:2019-12-01 14:41:22
【问题描述】:

我正在使用 Kafka Streams 创建一个基于 Spring 的服务,并注意到当我使用现有的 @Bean KStream VS 为该流创建一个流时会产生很大的耗时。

假设我有以下代码:

@Autowired
private StreamsBuilder eventsStreamsBuilder;

@Bean("eventsKStream")
public KStream<String, String> eventsKStream() {

   KStream<String, String> stream = eventsStreamsBuilder.stream(...);
   stream.to("NEW_TOPIC");
   return stream;
}

// 1. First approach, create a stream for the "NEW_TOPIC" and filter it
@Bean("eventsFilterKStream")
public KStream<String, String> eventsFilterKStream() {
   return eventsStreamsBuilder.stream("NEW_TOPIC",....)
   .filter()
}

// 2. Second approach, inject the existing Stream Bean and filter it
@Bean("eventsFilterKStream")
public KStream<String, String> eventsFilterKStream(@Qualifier("eventsKStream") KStream<String, String> eventsKStream) {
   return eventsKStream.filter(...);
}

我有一个消费者,使用控制台消费者...并注意到对于第一种方法,数据到达需要几秒钟,但在第二种方法中,我会立即获得过滤后的数据!

你能解释一下有什么区别吗? 为什么第一种方法需要几秒钟才能让消息到达消费者? 最好的方法是什么?

谢谢!

【问题讨论】:

    标签: java spring spring-boot apache-kafka-streams


    【解决方案1】:

    方法 #2 更快,因为它向现有的 KStream 添加了一个过滤器处理器节点,而不是在方法 #1 中创建一个全新的 KStream 从源主题读取。

    这取决于您的拓扑结构,但我会从 #2 开始,除非您有充分的理由不这样做。

    【讨论】:

    • 第一种方法和第二种方法之间有几秒的延迟是否合理?我的意思是,我只是将记录过滤到 NEW_TOPIC,然后为 NEW_TOPIC 打开一个 KStream,并期望它会超级快......但是,可能我错了,仍然......我不明白为什么这么慢.
    猜你喜欢
    • 1970-01-01
    • 2018-01-19
    • 1970-01-01
    • 2018-04-28
    • 1970-01-01
    • 1970-01-01
    • 2021-09-03
    • 2020-12-22
    • 2019-07-23
    相关资源
    最近更新 更多