【问题标题】:Kafka 10 - Python Client with Authentication and AuthorizationKafka 10 - 具有身份验证和授权的 Python 客户端
【发布时间】:2023-03-13 11:53:01
【问题描述】:

我有一个启用了 SASL_SSL(身份验证(JAAS)和授权)的 Kafka10 集群。 能够使用带有以下属性的 Java 客户端通过 SASL 连接。

ssl.keystore.location="client_keystore.jks"
ssl.keystore.password="password"
ssl.truststore.location="clienttruststore"
ssl.truststore.password="password" 

并通过 JVM 参数传递 JAAS conf 文件。

-Djava.security.auth.login.config=/path/to/client_jaas.conf

有没有办法用 python 客户端实现同样的功能?

【问题讨论】:

  • 那么解决方案是什么?在下面的解决方案中,有用户名/密码...不再使用密钥库?

标签: python apache-kafka kafka-python


【解决方案1】:

以下是使用 kafka-python 客户端为 SASL_SSL 工作的配置。我在 CentOS 6 上使用 kafka-python 1.4.6 和 kafka 2.2.0。 这些配置可用于 PLAINTEXT 和 SSL 安全协议以及 SASL_SSL 和 SASL_PLAINTEXT。

用于生成密钥文件、CARoot 和自签名证书以用于 SSL 的 Bash 脚本:

#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:admin123
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

然后您可以使用以下命令来提取 CARoot.pem:

keytool -exportcert -alias CARoot -keystore server.keystore.jks -rfc -file CARoot.pem

在我的 server.properties 文件中:

listeners=PLAINTEXT://localhost:9091,SASL_PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
security.protocol=SSL
sasl.enabled.mechanisms=PLAIN
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=admin123
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=admin123
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
advertised.listeners=PLAINTEXT://localhost:9091,SASL_PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094

在我的 JAAS 配置文件(/etc/kafka/kafka_plain_jaas.conf)中:

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
   username=kafka
   password=kafka-secret
   user_username=password;
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
  username=username
  password=password;
};

在启动 Kafka 服务器之前,需要运行以下命令:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_plain_jaas.conf"

Python 消费者和生产者: ssl_context 和 api_version 是导致我发生 SSL 握手错误的原因,从而导致超时。所以我把这些注释掉了。 (有一些教程提到使用这些。)

from kafka import KafkaConsumer, KafkaProducer
import kafka
import ssl
import logging
logging.basicConfig(level=logging.DEBUG)

try:
    topic = "sendMessage"
    sasl_mechanism = "PLAIN"
    username = "username"
    password = "password"
    security_protocol = "SASL_SSL"

    #context = ssl.create_default_context()
    #context.options &= ssl.OP_NO_TLSv1
    #context.options &= ssl.OP_NO_TLSv1_1

    consumer = KafkaConsumer(topic, bootstrap_servers='localhost:9094',
                              #api_version=(0, 10),
                              security_protocol=security_protocol,
                              #ssl_context=context,
                              ssl_check_hostname=True,
                              ssl_cafile='../keys/CARoot.pem',
                              sasl_mechanism = sasl_mechanism,
                              sasl_plain_username = username,
                              sasl_plain_password = password)
                              #ssl_certfile='../keys/certificate.pem',
                              #ssl_keyfile='../keys/key.pem')#,api_version = (0, 10))

    producer = KafkaProducer(bootstrap_servers='localhost:9094',
                             #api_version=(0, 10),
                             security_protocol=security_protocol,
                             #ssl_context=context,
                             ssl_check_hostname=True,
                             ssl_cafile='../keys/CARoot.pem',
                             sasl_mechanism=sasl_mechanism,
                             sasl_plain_username=username,
                             sasl_plain_password=password)
                              #ssl_certfile='../keys/certificate.pem',
                              #ssl_keyfile='../keys/key.pem')#, api_version = (0,10))
    # Write hello world to test topic
    producer.send(topic, bytes("Hello World SSL"))
    producer.flush()


    for msg in consumer:
        print(msg)


except Exception as e:
    print e

【讨论】:

    【解决方案2】:

    我一直在使用如下代码连接到 IBM Message Hub,它是底层的 kafka:

    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import ssl
    
    sasl_mechanism = 'PLAIN'
    security_protocol = 'SASL_SSL'
    
    # Create a new context using system defaults, disable all but TLS1.2
    context = ssl.create_default_context()
    context.options &= ssl.OP_NO_TLSv1
    context.options &= ssl.OP_NO_TLSv1_1
    
    producer = KafkaProducer(bootstrap_servers = app.config['KAFKA_BROKERS_SASL'],
                             sasl_plain_username = app.config['KAFKA_USERNAME'],
                             sasl_plain_password = app.config['KAFKA_PASSWORD'],
                             security_protocol = security_protocol,
                             ssl_context = context,
                             sasl_mechanism = sasl_mechanism,
                             api_version = (0,10),
                             retries=5)
    
    def send_message(message):
    
        try:
            producer.send(app.config['KAFKA_TOPIC'], message.encode('utf-8'))
        except:
            print("Unexpected error:", sys.exc_info()[0])
            raise
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-08
      • 1970-01-01
      • 2012-01-03
      • 2017-08-15
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多