【问题标题】:Spring Boot Kafka: Unable to start consumer due to NoSuchBeanDefinitionExceptionSpring Boot Kafka:由于 NoSuchBeanDefinitionException 而无法启动消费者
【发布时间】:2017-08-16 13:12:12
【问题描述】:

在我的 Spring Boot 服务中尝试启动 kafka 消费者看到 NoSuchBeanDefinitionException 并且无法启动服务本身。

下面是我的 bean 类,其中包含为 Kafka 配置创建的所有必需 bean

Spring Boot 版本:1.5.2.RELEASE

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import com.ns.kafka.gateway.dtos.GatewayCallBackMessage;

@Configuration
@EnableKafka
public class GatewayCallbackToPNConsumerConfig {
 @Bean
 public Map < String, Object > consumerProps() {
  Map < String, Object > props = new HashMap < > ();
  props.put(null, null);
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "gatewaycallbacktopngroup");
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
  return props;
 }

 @Bean
 public Deserializer < String > stringKeyDeserializer() {
  return new StringDeserializer();
 }

 @Bean
 public Deserializer < GatewayCallBackMessage > gatewayCallBackMessageJsonValueDeserializer() {
  return new JsonDeserializer < GatewayCallBackMessage > (GatewayCallBackMessage.class);
 }

 @Bean
 public ConsumerFactory < String, GatewayCallBackMessage > consumerFactory() {
  return new DefaultKafkaConsumerFactory < > (consumerProps(),
   stringKeyDeserializer(), gatewayCallBackMessageJsonValueDeserializer());
 }


 @Bean
 public ConcurrentKafkaListenerContainerFactory < String, GatewayCallBackMessage > kafkaListenerContainerFactory1() {
  ConcurrentKafkaListenerContainerFactory < String, GatewayCallBackMessage > factory = new ConcurrentKafkaListenerContainerFactory < > ();


  factory.setConcurrency(1);
  factory.setConsumerFactory(consumerFactory());
  return factory;
 }


}

这是异常跟踪:

org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ConsumerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1486) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1104) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1066) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:835) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:741) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:467) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1173) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1067) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:513) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761) ~[spring-beans-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:866) ~[spring-context-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542) ~[spring-context-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.2.RELEASE.jar:1.5.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737) ~[spring-boot-1.5.2.RELEASE.jar:1.5.2.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:370) ~[spring-boot-1.5.2.RELEASE.jar:1.5.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) ~[spring-boot-1.5.2.RELEASE.jar:1.5.2.RELEASE]
    at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:134) [spring-boot-1.5.2.RELEASE.jar:1.5.2.RELEASE]
    at com.ns.services.pn.main.PushNotificationServiceLauncher.main(PushNotificationServiceLauncher.java:28) [bin/:na]

2017-03-24 00:27:04.621 ERROR 51773 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : 

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found bean 'consumerFactory'


Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

【问题讨论】:

    标签: spring apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    将您的 kafkaListenerContainerFactory1 bean 名称更改为 kafkaListenerContainerFactory

    自动配置工厂正在寻找与您的不匹配的ConsumerFactory&lt;Object, Object&gt;,它是

    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    

    而消费者工厂是......

    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    

    或者,禁用 Kafka 自动配置。

    【讨论】:

    • 我有相同的 pb,我想禁用自动配置,但仍然使用 @EnableConfigurationProperties(KafkaProperties.class)。我在一个 jar 中使用 Kafka Spring,其他 Spring Boot 应用程序将使用它。那么如何在我的 jar 以及依赖它的任何 Spring Boot 应用程序中禁用自动配置?
    • 您应该提出一个新问题,而不是评论这个问题 - 这是一个不同的问题。您可以在@SpringBootApplication 注解中排除特定的自动配置类。
    • 感谢您的回答。我发布了一个新问题:stackoverflow.com/questions/43633263/…
    • FWIIW,自从v2.3.0.RELEASE 以来,这应该不再需要了。由github.com/spring-projects/spring-boot/issues/19221 修复
    【解决方案2】:

    1、KafkaListener配置

    @Configuration
    @EnableKafka
    public class KafkaListenerConfiguration {
    
        @Autowired
        private Environment environment;
    
        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setBatchListener(true);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers"));
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "1111");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            return props;
        }
    }
    

    2、KafkaListener设置

    @KafkaListener(topics = {KafkaTopicChannel.INPUT_CHANNEL}, containerFactory = "kafkaListenerContainerFactory")
    public void listen(List<ConsumerRecord<String, String>> list) {
        logger.debug(String.valueOf(list.size()));
    }
    

    【讨论】:

      猜你喜欢
      • 2019-07-19
      • 2016-12-09
      • 2019-11-21
      • 1970-01-01
      • 1970-01-01
      • 2019-06-22
      • 1970-01-01
      • 2018-02-18
      • 2019-08-25
      相关资源
      最近更新 更多