【发布时间】:2018-04-26 04:30:14
【问题描述】:
我正在使用带有 Spring Boot 的 Kafka Streams。在我的用例中,当我收到客户事件时,我需要将其存储在 customer-store 物化视图中,当我收到订单事件时,我需要加入客户并订购,然后将结果存储在 customer-order 物化视图中。
StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer-store"),Serdes.String(), customerSerde)
.withLoggingEnabled(new HashMap<>());
streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde)).to("customer-to-ktable-topic",Produced.with(Serdes.String(), customerSerde));
KTable<String, Customer> customerKTable = streamsBuilder.table("customer-to-ktable-topic", Consumed.with(Serdes.String(), customerSerde),Materialized.as(customerStateStore.name()));
这是问题所在,当我收到 Order 事件并且我的 customerKTable 返回 null 并且连接操作变得无用时。这不是它应该如何工作的。我的代码类似于Kafka Music 示例,我创建了TestConsumer 类来测试它。代码上传至Github供参考。
【问题讨论】:
标签: apache-kafka apache-kafka-streams