【发布时间】:2018-06-06 23:31:40
【问题描述】:
我正在像这样在流上聚合值:
private KTable<String, StringAggregator> aggregate(KStream<String, String> inputStream) {
return inputStream
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.aggregate(
StringAggregator::new,
(k, v, a) -> {
a.add(v);
return a;
}, Materialized.<String, StringAggregator>as(Stores.persistentKeyValueStore("STATE_STORE"))
.withKeySerde(Serdes.String())
.withValueSerde(getValueSerde(StringAggregator.class)));
}
通常,这可以完美运行。但是,当应用程序重新启动时,键的聚合值会丢失。此外,整个服务器也有可能被终止,而一个新的服务器(带有新版本的流应用程序)将上线。如何确保聚合的值保持不变?
【问题讨论】:
-
您的代码看起来正确,应该保留聚合值。我不清楚,为什么你可能会丢失数据。是否创建了商店更改日志主题?日志是否显示任何可疑内容?
-
是的,商店主题已创建,日志看起来很正常。
-
嗯...你试过打电话给
.withLoggingEnabled()吗?此外,您可以简化并仅传递“STATE_STORE”名称而不是供应商。
标签: java apache-kafka apache-kafka-streams