【发布时间】:2019-03-27 21:41:32
【问题描述】:
我将 KafkaStreams 聚合与自定义 TimestampExtractor 一起使用。 当我重新启动应用程序时,我的聚合从头开始。
StreamsBuilder builder = new StreamsBuilder()
KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
KTable table = stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(aggregationMinutes)))
.aggregate(
{ new AggregatorModel() },
{ key, value, aggregate ->
return new aggregation.add(value)
}
)
.toStream()
.map({ k, v ->
new KeyValue<>(k.window().end(), v)
})
.to('output')
def config = new Properties()
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerHost)
config.put(ConsumerConfig.GROUP_ID_CONFIG, 'group-id')
config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName())
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(60))
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
kafkaStreams.start()
我做错了什么?
【问题讨论】:
-
您的
applicationId值是多少?每次运行都一样吗? -
应用程序 ID 不会因运行而改变。
-
元注释:您不需要指定
group.id--application.id用作组 ID。您为什么指出您使用的是自定义TimestampExtractor?这应该独立于时间戳提取器。或者您是否声称,如果您不使用自定义提取器,问题就解决了?停止应用程序后,您是否检查过它是否正确提交了偏移量(例如,使用bin/kafka-consumer-groups.sh)?您是否在关机时致电kafkaStreams#close()以确保它在关机时提交? -
我收到了记录(例如
{key: f5ter, value: {createDate: 1535724900000, name: 'name', needToCount: 4 ...}})。在我的时间戳提取器中,我返回record.createDate。聚合后,我得到了带有 Windowed 键和 myAggregatedValue 的对象。我需要自定义 TimestampExtractor 来汇总我的记录,从 - 到 createDate of record。关机前我打电话给kafkaStream.close() -
我的意思是如果我有聚合
12:00 - 12:15现在是 12:30 并且我使用 createDate12:13获得记录,我希望这条记录在聚合12:00 - 12:15中。但是如果我重新启动应用程序,聚合12:00 - 12:15从头开始
标签: spring spring-boot groovy apache-kafka apache-kafka-streams