【问题标题】:How to check SSL Kafka topic is authorized using keystore如何使用密钥库检查 SSL Kafka 主题是否已授权
【发布时间】:2021-02-18 17:48:39
【问题描述】:

我有一个简单的KafkaProducer 使用keystore.jks 文件生成到 SSL Kafka 主题中

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SSL");
props.put("ssl.key.password", "password");
props.put("ssl.keystore.password", "password");
props.put("ssl.keystore.location", "/keystore.jks");

Producer<String, String> producer = new KafkaProducer<>(props);

producer.send(new ProducerRecord<String, String>("my-topic", key, body));

但由于 KafkaProducer 是一个惰性实例,它会在第一条消息到达时创建与 Kafka 集群的连接,如果主题未经授权,则会导致运行时失败

错误:

org.apache.kafka.common.errors.TopicAuthorizationException:无权访问主题:[my-topic]

有没有办法在启动 KafkaProducer 之前使用KafkaAdminClient 或编写自定义逻辑来检查主题授权?我确实有以下代码从keystore 文件中提取证书和密钥,但找不到任何后续步骤

KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());  
char[] pwdArray = "password".toCharArray();
keyStore.load(new FileInputStream("/keystore.jks"), pwdArray);

【问题讨论】:

    标签: java ssl apache-kafka


    【解决方案1】:

    虽然您没有为trust Kafka 服务器配置必要的属性:

    ssl.truststore.location=/var/private/ssl/client.truststore.jks
    ssl.truststore.password=test1234
    

    您的 SSL 设置可能没问题,但正如您所指出的,存在与您的客户端授权相关的某种问题:

    org.apache.kafka.common.errors.TopicAuthorizationException:无权访问主题:[my-topic]

    请考虑查看Kafka authorization documentation,尤其是描述用户名是如何从您的客户端 SSL 证书的CN 派生的part

    默认情况下,SSL 用户名的格式为“CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”。可以通过将 ssl.principal.mapping.rules 设置为 server.properties 中的自定义规则来改变这一点。此配置允许将 X.500 可分辨名称映射到短名称的规则列表。

    识别用户后,检查是否为他应用了必要的 ACL 和主题 my-topic。请参阅文档中提供的examples

    例如,要将上述用户添加为主题my-topic 的生产者,请尝试类似以下操作:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writeuser --producer --topic my-topic
    

    您可以验证应用到资源的 ACL,在您的用例中为 my-topic,类似以下内容:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic my-topic
    

    【讨论】:

    • 感谢命令行回答,但我正在寻找 Java 代码在连接到 Kafka 主题之前进行验证
    • 欢迎您@Deadpool。老实说,我之前没有使用过 Kafka Java API 的管理相关代码。我正在查看 API 和相关文档,并且当我意识到您的评论并且您实际上提供了新答案时,我正在尝试完成我的答案以提供基于 Java 的解决方案。你是完全正确的,这是正确的解决方案。无论如何,请考虑查看此project。查看下一条评论
    • 还有this other one。在我的研究中我遇到了,我认为它们可能会有所帮助。
    • 但老实说,您的回答帮助我找到了解决方案@jccampanero
    • 非常感谢您将我的答案标记为正确@Deadpool,我真的很感激!!如果您认为我可以提供任何帮助,请随时与我联系,我很乐意为您提供帮助。谢谢!!!
    【解决方案2】:

    经过长期研究并在@jccampanero 提供的answer 的帮助下,我创建了一段java 代码来列出针对特定主题的AdminClient 的ACLS

    AdminClient adminClient = KafkaAdminClient.create(configProps);
    
    ResourcePatternFilter resourceFilter = new ResourcePatternFilter(ResourceType.TOPIC,"my-topic", PatternType.ANY);
    
    AclBindingFilter aclBindingFilter = new AclBindingFilter(resourceFilter, AccessControlEntryFilter.ANY);
    
    DescribeAclsResult describe = adminClient.describeAcls(aclBindingFilter);
    
    KafkaFuture<Collection<AclBinding>> values = describe.values();
    
    Collection<AclBinding> aclBindings = values.get();
    
    aclBindings.forEach(acl->{
            System.out.println(String.format("%s %s %s",acl.entry().operation().name(),acl.entry().principal(),acl.entry().permissionType().name()));
        });
    

    输出:

    DESCRIBE User:CN=test-cert.com,O=Corporation,OU=1234567,L=Newyork,ST=NY,C=US ALLOW
    WRITE User:CN=CN=test-cert.com,O=Corporation,OU=1234567,L=Newyork,ST=NY,C=US ALLOW
    READ User:CN=test-cert.com,O=Corporation,OU=1234567,L=Newyork,ST=NY,C=US ALLOW
    

    现在您可以从AclBinding 中提取CN 部分,然后将其用作alias 以从JSK 文件中获取Certificate

     KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
     char[] pwdArray = "password".toCharArray();
     keyStore.load(new FileInputStream("/keystore.jks"), pwdArray);
           
     Certificate certificate = keyStore.getCertificate(alias);  //alias = test-cert.com
      if (certificate instanceof X509Certificate) {
          X509Certificate x509cert = (X509Certificate) certificate;
    
              // Get subject
              Principal principal = x509cert.getSubjectDN();
              String subjectDn = principal.getName();
              System.out.println(principal);  //test-cert.com
    
           }
                
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-09-22
      • 2019-11-25
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多