【问题标题】:KafkaStreams interactive queries with Spring-KafkaKafkaStreams 与 Spring-Kafka 的交互式查询
【发布时间】:2019-06-05 21:16:16
【问题描述】:

我正在使用 Spring Kafka 编写 Spring Boot 应用程序。 由于我的应用程序专注于 Kafka Streams,并且我需要使用交互式查询和查询我的状态存储,我想知道:是否有任何特殊方法可以使用 Spring Kafka 访问 kafka 流状态存储

据我所见,Spring Cloud Stream Binder Kafka Streams 对交互式查询有一些支持,但我在 Spring Kafka 中找不到任何关于它们的信息。 是我遗漏了什么还是 Spring-Kafka 不支持它?

如果是这样 - 在创建我自己的 org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService 版本时,我有什么特别需要考虑的吗?

当我只使用 Kafka 和 Kafka Streams 时,包含 Spring Cloud Streams 似乎有点太多了,但是如果提供了交互式查询支持,那么我自己很难实现,也许还是建议包含它?如有任何建议,我将不胜感激。

【问题讨论】:

  • AFAIK, spring-kafka 仅适用于生产者/消费者 API,不适用于 Kafka Streams
  • 注意:spring-cloud-stream-binder-kafka-streams 包括 spring-kafka,所以没有理由不能一起使用它们
  • 如果你想避免使用 Spring Cloud Stream,通过访问底层的KafkaStreams 对象来实现你自己的InteractiveQueryService 并不难。 Spring Cloud Stream 提供了访问状态存储的惯用用法,但是如果您没有转到基于微服务的模型的要求,那么我理解您可能想直接使用 spring-kaka。
  • 是的,如果不使用 Spring Cloud Stream,在这种情况下,您需要以编程方式访问状态存储并与之交互。这就是使用活页夹可能会对您有所帮助的地方。

标签: spring-boot apache-kafka apache-kafka-streams spring-cloud-stream spring-kafka


【解决方案1】:

如果您的应用程序中有@EnableKafkaStreams,那么您可以自动连接StreamsBuilderFactoryBean。 你可以注入StreamsBuilderFactoryBean,然后用.getKafkaStreams()从中获取KafkaStreams对象:

@RestController
public class StreamingController {

  private final KafkaStreams ingestAndAggregateStream;
  public StreamingController(KafkaStreams ingestAndAggregateStream){
    this.ingestAndAggregateStream = ingestAndAggregateStream;
  }

  @RequestMapping(value = "/queryStore", method = RequestMethod.GET)
  @ResponseBody
  public Value getKVStoreValue(@RequestParam String key){
    ReadOnlyKeyValueStore<String, Value> store = ingestAndAggregateStream
        .store("KV_STORE_NAME",
            QueryableStoreTypes.<String, Value>keyValueStore());
    return store.get(key);
  }
}

【讨论】:

    【解决方案2】:

    如果你使用 Spring-Kafka 和 Kafka-Streams, 我建议您将 KafkaStreams 定义为 @Bean,这样 spring 将处理对象的生命周期,类似于

    @Configuration
    public class KafkaConfig{
      @Bean
      KafkaStreams ingestAndAggregateStream(KafkaProperties kafkaProps){
      Topology t....
      KafkaStreams stream = new KafkaStreams(t,kafkaProps.buildStreamsProperties())
      stream.start();
      return stream;
      }
    }
    

    完成此操作后,您可以在控制器中自动连接流并使用它来检索和查询 LOCAL KVStore

    @RestController
    public class StreamingController {
    
      private final KafkaStreams ingestAndAggregateStream;
      public StreamingController(KafkaStreams ingestAndAggregateStream){
        this.ingestAndAggregateStream = ingestAndAggregateStream;
      }
    
      @RequestMapping(value = "/queryStore", method = RequestMethod.GET)
      @ResponseBody
      public Value getKVStoreValue(@RequestParam String key){
        ReadOnlyKeyValueStore<String, Value> store = ingestAndAggregateStream
            .store("KV_STORE_NAME",
                QueryableStoreTypes.<String, Value>keyValueStore());
        return store.get(key);
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-01-24
      • 1970-01-01
      • 2017-12-19
      • 2019-09-19
      • 1970-01-01
      • 2023-04-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多