【问题标题】:spring-kafka Connection to node -1 (/192.168.xx.xx:9092) could not be established. Broker may not be availablespring-kafka 无法建立到节点 -1 (/192.168.xx.xx:9092) 的连接。经纪人可能不可用
【发布时间】:2021-01-14 18:11:57
【问题描述】:

我的 Spring boot 2.4.1 运行在 localhost (192.168.189.115),kafka 2.13-2.6.0 运行在 192.168.48.54:9092

我可以通过 http://localhost:8010/kafka/publish?message=HelloKafka success 向生产者 kafka 发布消息。

但消费者收到错误无法建立与节点 -1 (/192.168.48.54:9092) 的连接。经纪人可能不可用。

我尝试更改 server.properties

listeners=PLAINTEXT://192.168.48.54:9092
advertised.listeners=PLAINTEXT://192.168.48.54:9092

或者(也都评论过)

listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.48.54:9092

application.properties

server.port=8010
spring.kafka.bootstrap-servers=192.168.48.54:9092
spring.kafka.consumer.group-id=fm-group

KafKaController.java

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
    
    @Autowired
    private Producer producer;

    @GetMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.produce(message);
    }
    
}

Producer.java

@Service
public class Producer {
    
    private static final Logger logger = LogManager.getFormatterLogger(Producer.class);
    private static String TOPIC = "customer.topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void produce(String data) {
        logger.info("Produce Topic: %s - Message: %s", TOPIC, data);
        this.kafkaTemplate.send(TOPIC, data);
    }
    
}

Consumer.java

@Service
public class Consumer {
    
    private static final Logger logger = LogManager.getFormatterLogger(Consumer.class);

    @KafkaListener(topics = "customer.topic", groupId = "fm-group")
    public void consume(String message) throws IOException {
        logger.info("Consume Message: %s", message);
    }
    
}

在 kafka 服务器中,我可以 ping 我的 ip (192.168.189.115)。我不知道为什么无法建立消费者。 我已经尝试了 stackoverflow 中的所有分辨率。请帮帮我。

EDIT#1 我将 Producer.java 更改为

public void produce(String data) {
        logger.info("Produce Topic: %s - Message: %s", TOPIC, data);              
        try {
            ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(TOPIC, data);
            logger.info("test");
            SendResult<String, String> sendResult = future.get(10, TimeUnit.SECONDS);
            logger.info("sendResult ", sendResult.getRecordMetadata());
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

我收到消息回复

org.apache.kafka.common.errors.TimeoutException: Topic customer.topic not present in metadata after 60000 ms.

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic customer.topic not present in metadata after 60000 ms.

看来我也无法发送消息。为什么我无法连接?请帮帮我

我使用 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Topic: customer.topic   PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: customer.topic   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Topic: streams-wordcount-output PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,segment.bytes=1073741824

EDIT#2 我也按照建议将其放入 pom.xml org.apache.kafka.common.errors.TimeoutException: Topic not present in metadata after 60000 ms

但是还是不行

<dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
    </dependency>
    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
    </dependency>

【问题讨论】:

  • 我尝试 listeners=PLAINTEXT://:9092 但 kafka 控制台消息说它无法绑定此 IP 地址。
  • org.apache.kafka.common.KafkaException:套接字服务器无法绑定到 192.168.189.115:9092:无法分配请求的地址。
  • 检查你的服务器、生产者和消费者地址配置;保持不变
  • 感谢@zhang-yuan。我做的。还在memorynotfound.com/… 中将新项目作为示例,但仍然出现相同的错误“主题 customer.topic 在 60000 毫秒后不存在于元数据中。”
  • 我的管理员告诉我端口 9092 服务仍然没有运行,但我用 sudo ss -tulwn 端口 9092 检查,显示 2181(不知道它是什么)。我不知道如何设置端口 9092 服务对外开放

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

您不应绑定到实际 IP,因为这会将流量限制到该地址

这会打开服务器以接受端口 9092 上的所有传入连接

listeners=PLAINTEXT://0.0.0.0:9092

【讨论】:

  • 谢谢,我试试 listeners=PLAINTEXT://0.0.0.0:9092 我得到了广告。听众不能使用不可路由的元地址 0.0.0.0。使用可路由的 IP 地址。所以我需要设置 Advertisementd.listeners=PLAINTEXT://192.168.48.54:9092 但它仍然是同样的错误
  • 您只需要更改监听器,而不是广告中的监听器。这里提到了调试步骤confluent.io/blog/kafka-listeners-explained
【解决方案2】:

最后,是网络问题。 192.168.48.54 上的 firewalld 已打开,我需要禁用它。

【讨论】:

    猜你喜欢
    • 2021-08-18
    • 2018-05-20
    • 1970-01-01
    • 2019-10-03
    • 2018-07-05
    • 2018-04-11
    • 1970-01-01
    • 1970-01-01
    • 2019-12-27
    相关资源
    最近更新 更多