【发布时间】:2021-10-11 09:19:35
【问题描述】:
我正在尝试使用 SSL 实现 Kafka 消费者,在 application.yml 中提供所有必需的配置;
当我启动 Spring Boot Kafka 消费者应用程序时;消费者正在尝试连接 localhost:9092 而不是提到的 Kafka Brokers。
KafkaConfig.java
@Bean
public ConsumerFactory<String, AvroRecord> consumerFactory() throws IOException {
return new DefaultKafkaConsumerFactory<>(kafkaProps());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, AvroRecord>>
kafkaListenerContainerFactory() throws IOException {
ConcurrentKafkaListenerContainerFactory<String, AvroRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
kafkaProps() 正在加载所有与 SSL 和引导服务器相关的属性。值,我可以在调试模式下看到。
application.yml
kafka:
properties:
basic:
auth:
credentials:
source: USER_INFO
user: username
pass: password
enableAutoRegister: true
max_count: 100
max_delay: 5000
schema:
registry:
url: https://schema-registry:8081
ssl:
truststore:
location: <<location>>
password: pwd
keystore:
location: <<location>>
password: pwd
key:
password: pwd
ssl:
enabled: true
protocols: TLSv1.2,TLSv1.1,TLSv1
truststore:
type: JKS
location: <<location>>
password: pwd
keystore:
type: JKS
location: <<location>>
password: pwd
key:
password: pwd
security:
protocol: SSL
consumer:
bootstrap-servers: broker1:9092,broker2:9092
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
max-message-size: 10241024
在应用程序日志中,我得到以下日志
18:46:33.964 [main] INFO o.a.k.c.a.AdminClientConfig requestId=
transactionKey= | AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
15:53:54.608 [kafka-admin-client-thread | adminclient-1] WARN o.a.k.c.NetworkClient requestId=
transactionKey= | [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
我找不到它,为什么它连接到本地主机而不是提到的代理
【问题讨论】:
标签: spring-boot apache-kafka spring-kafka