【问题标题】:Reset Spring Boot Kafka Stream Application on modifying topics在修改主题时重置 Spring Boot Kafka Stream Application
【发布时间】:2019-12-05 13:27:18
【问题描述】:

我正在使用 spring-kafka 在 Spring Boot 应用程序中使用 StreamsBuilderFactoryBean 运行 Kafka Stream。我通过删除和重新创建将某些主题中的分区数从 100 更改为 20,但现在在运行应用程序时,出现以下错误:

Existing internal topic MyAppId-KSTREAM-AGGREGATE-STATE-STORE-0000000092-changelog has invalid partitions: expected: 20; actual: 100. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.

我无法访问课程kafka.tools.StreamsResetter,并尝试调用StreamsBuilderFactoryBean.getKafkaStreams.cleanup(),但它给出了NullPointerException。我该如何进行上述清理?

【问题讨论】:

标签: spring-boot apache-kafka apache-kafka-streams spring-kafka


【解决方案1】:

相关文档位于here

第 1 步:本地清理

对于带有StreamsBuilderFactoryBean 的Spring Boot,只需将CleanerConfig 添加到构造函数即可完成第一步:

// Before
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config));
// After
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config), new CleanupConfig(true, true));

这允许在start() 之前和stop() 之后调用KafkaStreams.cleanUp() 方法。

第 2 步:全局清理

对于第 2 步,应用程序的所有实例都已停止,只需按照文档中的说明使用该工具即可:

# In kafka directory
bin/kafka-streams-application-reset.sh --application-id "MyAppId" --bootstrap-servers 1.2.3.4:9092 --input-topics x --intermediate-topics first_x,second_x,third_x --zookeeper 1.2.3.4:2181

这是做什么的:

对于任何指定的输入主题:将应用程序的已提交消费者偏移量重置为所有分区的“主题开始”(对于消费者组 application.id)。

对于任何指定的中间主题:跳到主题的末尾,即将所有分区的应用程序提交的消费者偏移量设置为每个分区的 logSize(对于消费者组 application.id)。

对于任何内部主题:删除内部主题(这也会删除已提交的相应已提交偏移量)。

【讨论】:

    猜你喜欢
    • 2021-05-08
    • 2020-12-08
    • 2022-01-02
    • 1970-01-01
    • 2019-04-17
    • 2017-11-20
    • 1970-01-01
    • 1970-01-01
    • 2019-09-07
    相关资源
    最近更新 更多