【问题标题】:Remote Kafka Connection Issues - Broker may not be available远程 Kafka 连接问题 - 代理可能不可用
【发布时间】:2020-02-12 13:19:38
【问题描述】:

我正在学习 Kafka,并且已经开始使用 Maven。

我在 AWS 中有一个独立的 Kafka 实例,在我的笔记本电脑上有一个 Maven 应用程序。我写了一个作为生产者的小应用程序

import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {

    public static void main(String[] args) {
        // create producer properties
        Properties properties = new Properties();

        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<IP_TO_REMOTE_SERVER>:9092");

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //create producer
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

       //producer record
        ProducerRecord <String,String> record = new ProducerRecord<String, String>("first_topic", "jello there");
        System.out.println("SENDING RECORD");
        //send data - async
        producer.send(record);

        producer.flush();

        producer.close();
        System.out.println("complete");
    }
}

当我运行它时,似乎我无法连接到远程实例。我收到以下错误。

[kafka-producer-network-thread |> producer-1] 警告 org.apache.kafka.clients.NetworkClient - [生产者 clientId=producer-1] 无法连接到节点 0 (/xx.xx.xx.xx:9092) 成立。经纪人可能不可用。

[main] 信息 org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] 关闭 Kafka 生产者 timeoutMillis = 9223372036854775807 毫秒。

查看 Stackoverflow 后,我将 server.properties 侦听器部分更新为服务器的私有 IP

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.0.1.51:9092

我应该如何将服务器上的 Kafka 配置为可远程访问和侦听?

【问题讨论】:

  • 为什么不在“listeners”部分使用公网IP地址呢? IP 必须可访问,以便您可以连接到代理。
  • 我无法将“侦听器”设置为公共 IP,因为 EC2 VPC 实例只知道它们的私有 IP,而不是公共 IP,因为它是由 AWS 进行 NAT 的。但是,您让我想到...这篇博文 rmoff.net/2018/08/02/kafka-listeners-explained 提到使用广告地址 - 将其设置为公开就可以了。

标签: java maven apache-kafka


【解决方案1】:

看到回复让我开始考虑更改我的配置以使其正常工作。我发现一篇非常好的博客文章解决了这个问题here

我的设置

我要强调这不是生产

公共子网中 VPC 中的单个 AWS EC2 实例。卡夫卡安装。我正在使用 Maven 从我的笔记本电脑远程连接到 Kafka 作为生产者。

zookeeper.properties

没有变化

更新了 server.properties ,特别是 listenersadvertised.listeners

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://<PRIVATE_IP_ADDRESS>:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://<PUBLIC_IP_ADDRESS>:9092

然后在我的 Maven 代码中,对于 BOOTSTRAP_SERVERS_CONFIG 我引用公共 IP

import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {

    public static void main(String[] args) {
        // create producer properties
        Properties properties = new Properties();

        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<PUBLIC_IP_ADRESS>:9092");

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //create producer
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

       //producer record
        ProducerRecord <String,String> record = new ProducerRecord<String, String>("first_topic", "good pony");
        System.out.println("SENDING RECORD");
        //send data - async
        producer.send(record);

        producer.flush();

        producer.close();
        System.out.println("complete");
    }
}

运行成功

发送记录

[kafka-producer-network-thread | producer-1] 信息 org.apache.kafka.clients.Metadata - [生产者 clientId=producer-1]

[main] INFOorg.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] 用 timeoutMillis = 关闭 Kafka 生产者 9223372036854775807 毫秒。

完成

我们看到推送给消费者的文本

【讨论】:

    【解决方案2】:

    我想您面临的主要问题是从配置的角度来看。在通过生产者沟通之前,请检查您是否进行了所有必要的更改。您需要进行以下更改:

    卡夫卡变化: 您需要在 Zookeeper.properties 中为相关代理添加配置。

    AWS 更改: 连接到 AWS 时,您需要设置传递 .pem 文件的方式。您可能需要在 AWS 实例中启用直接访问。默认情况下,它将阻止所有未知流量。

    其他方法: 我建议创建一个证书和密钥文件,将您自己的 PC 列入白名单作为有效来源。 将该证书添加到 AWS 服务器实例上的密钥库和信任库。 更改 server.properties listeners = SSL://your.host.name:9092 和您的 BOOTSTRAP_SERVERS_CONFIG

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-08-06
      • 2019-07-10
      • 1970-01-01
      • 1970-01-01
      • 2016-02-08
      • 1970-01-01
      相关资源
      最近更新 更多