【问题标题】:Kafka Streams Do Not Restart with Aggregated ValuesKafka 流不会以聚合值重新启动
【发布时间】:2018-06-06 23:31:40
【问题描述】:

我正在像这样在流上聚合值:

private KTable<String, StringAggregator> aggregate(KStream<String, String> inputStream) {
    return inputStream
            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
            .aggregate(
                    StringAggregator::new,
                    (k, v, a) -> {
                        a.add(v);
                        return a;
                    }, Materialized.<String, StringAggregator>as(Stores.persistentKeyValueStore("STATE_STORE"))
                            .withKeySerde(Serdes.String())
                            .withValueSerde(getValueSerde(StringAggregator.class)));
}

通常,这可以完美运行。但是,当应用程序重新启动时,键的聚合值会丢失。此外,整个服务器也有可能被终止,而一个新的服务器(带有新版本的流应用程序)将上线。如何确保聚合的值保持不变?

【问题讨论】:

  • 您的代码看起来正确,应该保留聚合值。我不清楚,为什么你可能会丢失数据。是否创建了商店更改日志主题?日志是否显示任何可疑内容?
  • 是的,商店主题已创建,日志看起来很正常。
  • 嗯...你试过打电话给.withLoggingEnabled()吗?此外,您可以简化并仅传递“STATE_STORE”名称而不是供应商。

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

我最终创建了聚合逻辑,该逻辑使用已在 kafka 主题上持久化的聚合结果。逻辑如下:

private KStream<String, StringAggregator> getAggregator(String topicName, 
                                                        KStream<String, String> input,
                                                        KTable<String, StringAggregator> aggregator) {

    return input
            .leftJoin(aggregator, (inputMessage, aggregatorMessage) -> { 
                if (aggregatorMessage == null) { 
                    aggregatorMessage = new StringAggregator(); 
                }
                aggregatorMessage.add(inputMessage);
                return aggregatorMessage; 
            }).peek((k, v) -> logger.info("Aggregated a join input for {}: {}, {} aggregated.", topicName, k, v.size()));
}

这是实际构建流的逻辑。

String topicName = "input";
KStream<String, String> input = streamsBuilder.stream(topicName);
KTable<String, StringAggregator> aggregator = streamsBuilder.table("aggregate");
getAggregator(topicName, input, aggregator).to("aggregate");

【讨论】:

    猜你喜欢
    • 2019-05-14
    • 1970-01-01
    • 2019-03-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多