【问题标题】:Kafka consumer can't connect to broker other than localhost:9092 using Spring Boot 2.2.0.M4Kafka 消费者无法使用 Spring Boot 2.2.0.M4 连接到 localhost:9092 以外的代理
【发布时间】:2019-12-02 23:18:41
【问题描述】:

我正在使用 Spring Boot 2.2.0.M4 和 Kafka 2.2.0 尝试基于 https://www.baeldung.com/spring-kafka 的示例构建应用程序。当我为我的主题启用侦听器时,我在消费者上收到以下错误。

[AdminClient clientId=adminclient-2] 无法建立到节点 -1 (localhost/127.0.0.1:9092) 的连接。经纪人可能不可用。

以下是在我的应用程序属性中定义的。

kafka.bootstrapAddress=172.22.22.55:9092

这是 @KafkaListener 注释的方法。

@KafkaListener(topics = "add_app", groupId = "foo")
public void listen(String message) {
    System.out.println("Received Message in group foo: " + message);
}

下面是引用 kafka.bootstrapAddress 值的 Consumer 配置类。它已正确记录。

@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        log.info("Created {} using address {}.", this.getClass(), bootstrapAddress);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("foo"));
        return factory;
    }

【问题讨论】:

  • 我遇到了类似的问题,起初我认为这是我的 spring kafka 端点配置的问题,但事实证明我在我的 kafka 上错误地配置了广告监听器。看看这个答案stackoverflow.com/questions/59867685/…

标签: spring-kafka


【解决方案1】:

解决方案相当简单。我只需要将以下内容添加到 application.properties 文件中。

spring.kafka.bootstrap-servers=174.22.22.55:9092

查看KafkaProperties.java后,发现这一行:

private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));

这个方法实际上是构建它们:

private Map<String, Object> buildCommonProperties() {
        Map<String, Object> properties = new HashMap();
        if (this.bootstrapServers != null) {
            properties.put("bootstrap.servers", this.bootstrapServers);
        }

        if (this.clientId != null) {
            properties.put("client.id", this.clientId);
        }

        properties.putAll(this.ssl.buildProperties());
        if (!CollectionUtils.isEmpty(this.properties)) {
            properties.putAll(this.properties);
        }

        return properties;
    }

由于已经在类中预定义,所以最初在 KafkaConsumerConfig 上定义的代理没有被使用。

更新

将 containerFactory 属性添加到侦听器注释也可以修复它,并且无需更改 application.properties。

@KafkaListener(topics = "add_app", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listen(String message) {
    System.out.println("Received Message in group foo: " + message);
}

【讨论】:

  • 除非您需要某种高级配置,否则您实际上并不需要任何基础设施(消费者、容器工厂)。 Boot 会从属性中自动配置它们。
  • 在我的情况下,即使在 ConsumerFactory 属性中设置了 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG 之后,application.properties 中的 spring.kafka.bootstrap-servers=myserver:myport 也需要。我想这是一个优先规则。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-03-14
  • 1970-01-01
  • 2017-08-07
  • 2016-08-01
  • 2018-08-25
  • 2023-04-11
  • 2016-12-09
相关资源
最近更新 更多