【问题标题】:Spring cloud Kafka Stream:Error creating bean with name 'kafkaMessageChannelBinder'Spring cloud Kafka Stream:创建名为“kafkaMessageChannelBinder”的bean时出错
【发布时间】:2021-03-17 03:23:51
【问题描述】:

我正在尝试使用 spring cloud Kafka 流 binder(2.4.3) 来消费和生成 Avro 消息。我可以使用单个函数来使用消息并生成记录,但是我正在寻找一个可以在应用程序中多次使用的生产者。 StreamBridge 似乎是一个选项,并尝试了以下方法,但它不起作用,我错过了什么

    @Autowired
    StreamBridge streamBridge;

    @Bean
    public Consumer<KStream<EventKey, Event>> process(){
        return input -> {
             input.peek((k,v) -> sendToValue(v)
              );
        };
    }


    private Event sendToValue(Event event){
        System.out.println(Event);
        streamBridge.send("process-out-0",event);
        System.out.println("Message sent");
        return touchpointEvent;
    } 

活页夹:

spring:
  application:
    name: ${applicaton-name}
  cloud:
    stream:
      function:
        definition: process
      bindings:
        process-in-0:
          destination: ${input-topic-name}
          contentType: application/Avro
        process-out-0:
          destination: ${enriched-topic-name}
          contentType: application/Avro
      binding-retry-interval: 30
      kafka:
        streams:
          binder:
            brokers: ${kafka-broker}
            application-id: ${consumer-group-name}
            auto-create-topics: false
            auto-add-partitions: false
            configuration:
              processing.guarantee: at_least_once
              auto.offset.reset: earliest
              schema.registry.url: ${kafka-schema-registry}
              auto-register-schema: false
              security.protocol: SSL
              useNativeEncoding: true
              specific.avro.reader: true

错误

[2021-03-16 21:51:52,219] [INFO] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.c.s.b.DefaultBinderFactory DefaultBinderFactory.java:243] Creating binder: ktable
[2021-03-16 21:51:52,312] [INFO] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.c.s.b.DefaultBinderFactory DefaultBinderFactory.java:343] Caching the binder: ktable
[2021-03-16 21:51:52,313] [INFO] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.c.s.b.DefaultBinderFactory DefaultBinderFactory.java:347] Retrieving cached binder: ktable
[2021-03-16 21:51:52,313] [INFO] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.c.s.b.DefaultBinderFactory DefaultBinderFactory.java:243] Creating binder: kafka
[2021-03-16 21:51:52,372] [WARN] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.c.a.AnnotationConfigApplicationContext AbstractApplicationContext.java:559] Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaMessageChannelBinder' defined in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration: Unexpected exception during bean creation; nested exception is java.lang.NoClassDefFoundError: org/springframework/integration/support/management/ManageableLifecycle
[2021-03-16 21:51:52,374] [INFO] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.b.a.l.ConditionEvaluationReportLoggingListener ConditionEvaluationReportLoggingListener.java:136] 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
[2021-03-16 21:51:52,379] [ERROR] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.b.SpringApplication SpringApplication.java:837] Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaMessageChannelBinder' defined in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration: Unexpected exception during bean creation; nested exception is java.lang.NoClassDefFoundError: org/springframework/integration/support/management/ManageableLifecycle
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:529)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:324)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:322)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)

随版本更改更新:

[2021-03-17 09:56:28,043] [INFO] [latest-0ea7c0a5-21a4-464b-bf42-a5c35b6687dc-StreamThread-1] [o.a.k.c.u.AppInfoParser AppInfoParser.java:117] Kafka version: 6.0.1-ccs
[2021-03-17 09:56:28,043] [INFO] [latest-0ea7c0a5-21a4-464b-bf42-a5c35b6687dc-StreamThread-1] [o.a.k.c.u.AppInfoParser AppInfoParser.java:118] Kafka commitId: 9c1fbb3db1e0d69d
[2021-03-17 09:56:28,043] [INFO] [latest-0ea7c0a5-21a4-464b-bf42-a5c35b6687dc-StreamThread-1] [o.a.k.c.u.AppInfoParser AppInfoParser.java:119] Kafka startTimeMs: 1615992988043
[2021-03-17 09:56:28,047] [WARN] [kafka-admin-client-thread | adminclient-2] [o.a.k.c.NetworkClient NetworkClient.java:757] [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2021-03-17 09:56:28,151] [WARN] [kafka-admin-client-thread | adminclient-2] [o.a.k.c.NetworkClient NetworkClient.java:757] [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2021-03-17 09:56:28,357] [WARN] [kafka-admin-client-thread | adminclient-2] [o.a.k.c.NetworkClient NetworkClient.java:757] [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2021-03-17 09:56:28,568] [WARN] [kafka-admin-client-thread | adminclient-2] [o.a.k.c.NetworkClient NetworkClient.java:757] [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2021-03-17 09:56:28,987] [WARN] [kafka-admin-client-thread | adminclient-2] [o.a.k.c.NetworkClient NetworkClient.java:757] [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

【问题讨论】:

  • 看起来有些依赖问题。我怀疑2.4.3,你的意思是引导版本。什么版本的活页夹?你能在maven pom中显示依赖关系吗?
  • @sobychacko Yes 似乎是错误的依赖问题。但是在将活页夹版本升级到 3.1.1 之后。生产者正在尝试连接到本地主机而不是活页夹配置代理。用新日志更新了问题

标签: apache-kafka-streams spring-kafka spring-cloud-stream kafka-producer-api


【解决方案1】:

java.lang.NoClassDefFoundError: org/springframework/integration/support/management/ManageableLifecycle

您在类路径上有一个旧版本的 spring-integration-core - 该类是在 5.4 中添加的。

在使用 Boot 时,您永远不应该声明版本;让 Boot 使用其依赖管理功能引入正确的版本。

【讨论】:

  • Yes 似乎是进行更改后错误的依赖问题,生产者正在尝试连接到 localhost 而不是 binder 配置代理。用新日志更新了问题。
  • 您也可以尝试添加spring.cloud.stream.kafka.binder.brokers 吗?因为您有StreamBridge,它正在使用常规活页夹。如果您要添加它,我认为您不需要流绑定器之一(kafka.stream.binder.brokers)。
猜你喜欢
  • 2018-05-25
  • 1970-01-01
  • 2022-01-12
  • 2020-12-22
  • 2016-10-22
  • 2023-02-09
  • 1970-01-01
  • 2018-04-28
  • 1970-01-01
相关资源
最近更新 更多