【发布时间】:2021-02-18 17:48:39
【问题描述】:
我有一个简单的KafkaProducer 使用keystore.jks 文件生成到 SSL Kafka 主题中
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SSL");
props.put("ssl.key.password", "password");
props.put("ssl.keystore.password", "password");
props.put("ssl.keystore.location", "/keystore.jks");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("my-topic", key, body));
但由于 KafkaProducer 是一个惰性实例,它会在第一条消息到达时创建与 Kafka 集群的连接,如果主题未经授权,则会导致运行时失败
错误:
org.apache.kafka.common.errors.TopicAuthorizationException:无权访问主题:[my-topic]
有没有办法在启动 KafkaProducer 之前使用KafkaAdminClient 或编写自定义逻辑来检查主题授权?我确实有以下代码从keystore 文件中提取证书和密钥,但找不到任何后续步骤
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
char[] pwdArray = "password".toCharArray();
keyStore.load(new FileInputStream("/keystore.jks"), pwdArray);
【问题讨论】:
标签: java ssl apache-kafka