【问题标题】:Kafka consumer not picking mentioned Bootstrap serversKafka 消费者没有选择提到的 Bootstrap 服务器
【发布时间】:2021-10-11 09:19:35
【问题描述】:

我正在尝试使用 SSL 实现 Kafka 消费者,在 application.yml 中提供所有必需的配置;

当我启动 Spring Boot Kafka 消费者应用程序时;消费者正在尝试连接 localhost:9092 而不是提到的 Kafka Brokers。

KafkaConfig.java

@Bean
    public ConsumerFactory<String, AvroRecord> consumerFactory() throws IOException {
        return new DefaultKafkaConsumerFactory<>(kafkaProps());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, AvroRecord>>
    kafkaListenerContainerFactory() throws IOException {
        ConcurrentKafkaListenerContainerFactory<String, AvroRecord> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

kafkaProps() 正在加载所有与 SSL 和引导服务器相关的属性。值,我可以在调试模式下看到。

application.yml

kafka:
  properties:
    basic:
      auth:
        credentials:
          source: USER_INFO
          user: username
          pass: password
    enableAutoRegister: true
    max_count: 100
    max_delay: 5000
    schema:
      registry:
        url: https://schema-registry:8081
        ssl:
          truststore:
            location: <<location>>
            password: pwd
          keystore:
            location: <<location>>
            password: pwd
          key:
            password: pwd
    ssl:
      enabled: true
      protocols: TLSv1.2,TLSv1.1,TLSv1
      truststore:
        type: JKS
        location: <<location>>
        password: pwd
      keystore:
        type: JKS
        location: <<location>>
        password: pwd
      key:
        password: pwd
    security:
      protocol: SSL
    consumer:
      bootstrap-servers: broker1:9092,broker2:9092
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      max-message-size: 10241024

在应用程序日志中,我得到以下日志

18:46:33.964 [main] INFO  o.a.k.c.a.AdminClientConfig requestId=
                    transactionKey= | AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000


15:53:54.608 [kafka-admin-client-thread | adminclient-1] WARN  o.a.k.c.NetworkClient requestId=
                    transactionKey= | [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

我找不到它,为什么它连接到本地主机而不是提到的代理

【问题讨论】:

    标签: spring-boot apache-kafka spring-kafka


    【解决方案1】:
    1. The correct property is spring.kafka.bootstrap-servers。您似乎完全缺少 spring 前缀。此外,schema.registry.urlssl.truststore 等都被视为 Kafka 客户端的 singular 属性键(字符串),因此(据我所知)不应“嵌套”在 YAML 对象中

    2. 您只是尝试在消费者上设置引导属性,而不是在 AdminClient 上

    3. 在与引导服务器字符串建立初始连接后,您的客户端将始终连接到代理的 advertised.listeners,因此如果是 localhost:9092,将解释 AdminClient 日志输出

    【讨论】:

    • 因为我从 KafkaConfig 类而不是 application.yml 中提供 kafka 消费者元数据。使用 spring.kafka.* trusstore 位置,我必须将其作为绝对路径;所以我尝试了 kafkaConfig
    • 我不确定你的意思,但消费者和信任库属性不会改变我的答案。您仍然可以将引导服务器设置为所有客户端的“顶级”,而不仅仅是消费者。您还可以为 AdminClient 创建工厂
    猜你喜欢
    • 1970-01-01
    • 2018-10-31
    • 2018-09-12
    • 2020-11-15
    • 2022-08-13
    • 2019-03-15
    • 2017-11-29
    • 2017-01-04
    • 1970-01-01
    相关资源
    最近更新 更多