【发布时间】:2019-12-02 23:18:41
【问题描述】:
我正在使用 Spring Boot 2.2.0.M4 和 Kafka 2.2.0 尝试基于 https://www.baeldung.com/spring-kafka 的示例构建应用程序。当我为我的主题启用侦听器时,我在消费者上收到以下错误。
[AdminClient clientId=adminclient-2] 无法建立到节点 -1 (localhost/127.0.0.1:9092) 的连接。经纪人可能不可用。
以下是在我的应用程序属性中定义的。
kafka.bootstrapAddress=172.22.22.55:9092
这是 @KafkaListener 注释的方法。
@KafkaListener(topics = "add_app", groupId = "foo")
public void listen(String message) {
System.out.println("Received Message in group foo: " + message);
}
下面是引用 kafka.bootstrapAddress 值的 Consumer 配置类。它已正确记录。
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
log.info("Created {} using address {}.", this.getClass(), bootstrapAddress);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("foo"));
return factory;
}
【问题讨论】:
-
我遇到了类似的问题,起初我认为这是我的 spring kafka 端点配置的问题,但事实证明我在我的 kafka 上错误地配置了广告监听器。看看这个答案stackoverflow.com/questions/59867685/…
标签: spring-kafka