【问题标题】:Getting store for GlobalKTable crashes after upgrading to kafka-streams:5.5.0-css (Apache Kafka 2.5.0) [RESOLVED]升级到 kafka-streams:5.5.0-css (Apache Kafka 2.5.0) [已解决] 后获取 GlobalKTable 的存储崩溃
【发布时间】:2020-10-05 22:06:45
【问题描述】:

我有一个使用 GlobalKTable 的 Spring Boot 应用程序。在从 5.3.2-css ( Apache Kafka 2.3.1)。

这是我的配置:

@Configuration
@EnableKafkaStreams
public class GlobalTableConfiguration {

    public GlobalTableConfiguration() {
    }

    @Bean
    public GlobalKTable<String, String> table(StreamsBuilder kStreamsBuilder) {
        return kStreamsBuilder.globalTable("topic1", Consumed.with(null, null), 
                                            Materialized.as("topic1-store"));
    }
}

我得到这样的商店:

streamsBuilderFactoryBean.getKafkaStreams().
                store("topic1-store", QueryableStoreTypes.keyValueStore());

这失败了:

Request processing failed; nested exception is java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)


Caused by: java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
    at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:316)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1182)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1169)

我可以看到在此之前流线程正在关闭:

2020-06-16 13:22:46.943  INFO 72423 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
        2020-06-16 13:22:46.944  INFO 72423 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
        2020-06-16 13:22:46.944  INFO 72423 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1592299366943
        2020-06-16 13:22:46.946  INFO 72423 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: aKrIp_7wQcqF9OlSUoBgSQ
        2020-06-16 13:22:47.496  INFO 72423 --- [    Test worker] org.apache.kafka.streams.KafkaStreams    : stream-client [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4] State transition from ERROR to PENDING_SHUTDOWN
        2020-06-16 13:22:47.497  INFO 72423 --- [ms-close-thread] o.a.k.s.p.internals.StreamThread         : stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-StreamThread-1] Informed to shut down
        2020-06-16 13:22:47.497  INFO 72423 --- [ms-close-thread] o.a.k.s.p.internals.GlobalStreamThread   : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] State transition from RUNNING to PENDING_SHUTDOWN
        2020-06-16 13:22:47.557  INFO 72423 --- [balStreamThread] o.a.k.s.p.internals.GlobalStreamThread   : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] Shutting down
        2020-06-16 13:22:47.571  INFO 72423 --- [balStreamThread] o.a.k.s.p.internals.GlobalStreamThread   : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] State transition from PENDING_SHUTDOWN to DEAD
        2020-06-16 13:22:47.571  INFO 72423 --- [balStreamThread] o.a.k.s.p.internals.GlobalStreamThread   : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] Shutdown complete

经过一些实验,我通过添加到我的配置使其工作:

    @Bean
    public KStream kStream(StreamsBuilder kStreamsBuilder) {
        return kStreamsBuilder.stream("some-topic", Consumed.with(null, null));
    }

所以基本上当我定义了任何 KStream(从任何主题消费)时,流线程保持活动状态并且一切都像升级前一样工作。 我的问题是,如果没有这个无用的 bean(和主题),正确的方法是什么。

编辑

这里讨论了一个类似的问题:Kafka Streams 2.5.0 requires input topic 看起来这将在 kafka-streams 2.5.1 和 util 中修复,然后设置 num.stream.threads: 0 比声明虚拟流更好。

【问题讨论】:

    标签: java spring apache-kafka apache-kafka-streams spring-kafka


    【解决方案1】:

    这似乎与 Spring 无关,是由 kafka-streams 类的一些内部变化引起的。

    这适用于 Boot 2.2.x (Kafka-streams 2.3.x)。

    @SpringBootApplication
    @EnableKafkaStreams
    public class So62406117Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So62406117Application.class, args);
        }
    
        @Bean
        public GlobalKTable<String, String> table(StreamsBuilder kStreamsBuilder) {
            return kStreamsBuilder.globalTable("topic1", Consumed.with(null, null),
                    Materialized.as("topic1-store"));
        }
    
        @Bean
        public ApplicationRunner runner(StreamsBuilderFactoryBean fb) {
            return args -> {
                ReadOnlyKeyValueStore<Object, Object> store =
                        fb.getKafkaStreams().store("topic1-store", QueryableStoreTypes.keyValueStore());
                System.out.println(store);
            };
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("topic1").partitions(1).replicas(1).build();
        }
    
    }
    

    但 Boot 2.3 (Kafka-Streams 2.5.0) 失败。

    我们肯定会启动KafkaStreams(在工厂bean start() 方法中,但在此期间start() 我们得到了

    
    java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1228) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.0.jar:na]
    
    2020-06-16 17:44:02.700  INFO 10635 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
    2020-06-16 17:44:02.700  INFO 10635 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1] Shutting down
    2020-06-16 17:44:02.700  INFO 10635 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
    2020-06-16 17:44:02.700  INFO 10635 --- [-StreamThread-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
    2020-06-16 17:44:02.704  INFO 10635 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
    2020-06-16 17:44:02.704  INFO 10635 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [foo-235af8e6-6618-4e73-86ad-75307130004b] State transition from REBALANCING to ERROR
    2020-06-16 17:44:02.704 ERROR 10635 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [foo-235af8e6-6618-4e73-86ad-75307130004b] All stream threads have died. The instance will be in error state and should be closed.
    2020-06-16 17:44:02.704  INFO 10635 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1] Shutdown complete
    

    【讨论】:

    • 谢谢,你是对的。这是 Kafka 2.5.0 中的一个问题,它将在 2.5.1 中修复。我已经编辑了我的原始帖子。
    猜你喜欢
    • 2021-10-14
    • 1970-01-01
    • 2019-07-03
    • 2020-11-11
    • 2020-01-24
    • 1970-01-01
    • 2021-10-27
    • 2020-10-05
    • 2020-07-30
    相关资源
    最近更新 更多