【问题标题】:Spring kafka reset aggregation after restart application重启应用程序后Spring kafka重置聚合
【发布时间】:2019-03-27 21:41:32
【问题描述】:

我将 KafkaStreams 聚合与自定义 TimestampExtractor 一起使用。 当我重新启动应用程序时,我的聚合从头开始。

    StreamsBuilder builder = new StreamsBuilder()
    KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))

    KTable table = stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(aggregationMinutes)))
            .aggregate(
            { new AggregatorModel() },
            { key, value, aggregate ->


                return new aggregation.add(value)
            }
    )
            .toStream()
            .map({ k, v ->
        new KeyValue<>(k.window().end(), v)
    })
            .to('output')

    def config = new Properties()
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerHost)
    config.put(ConsumerConfig.GROUP_ID_CONFIG, 'group-id')
    config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName())
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(60))

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
    kafkaStreams.start()

我做错了什么?

【问题讨论】:

  • 您的applicationId 值是多少?每次运行都一样吗?
  • 应用程序 ID 不会因运行而改变。
  • 元注释:您不需要指定group.id -- application.id 用作组 ID。您为什么指出您使用的是自定义TimestampExtractor?这应该独立于时间戳提取器。或者您是否声称,如果您不使用自定义提取器,问题就解决了?停止应用程序后,您是否检查过它是否正确提交了偏移量(例如,使用bin/kafka-consumer-groups.sh)?您是否在关机时致电 kafkaStreams#close() 以确保它在关机时提交?
  • 我收到了记录(例如{key: f5ter, value: {createDate: 1535724900000, name: 'name', needToCount: 4 ...}})。在我的时间戳提取器中,我返回 record.createDate。聚合后,我得到了带有 Windowed 键和 myAggregatedValue 的对象。我需要自定义 TimestampExtractor 来汇总我的记录,从 - 到 createDate of record。关机前我打电话给kafkaStream.close()
  • 我的意思是如果我有聚合 12:00 - 12:15 现在是 12:30 并且我使用 createDate 12:13 获得记录,我希望这条记录在聚合 12:00 - 12:15 中。但是如果我重新启动应用程序,聚合12:00 - 12:15 从头开始​​

标签: spring spring-boot groovy apache-kafka apache-kafka-streams


【解决方案1】:

我发现了问题所在。我在 3 天多前汇总了数据,但参数“windowstore.changelog.additional.retention.ms”默认设置为 24 小时。我的聚合从一开始就开始了。当我汇总当天的数据时,一切正常。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多