【发布时间】:2021-02-09 08:00:12
【问题描述】:
我能够使用 client.properties 中的以下 ssl 详细信息连接到 kafka 并从 CLI (bin/kafka-console-consumer.sh) 读取数据
ssl.keystore.location=/test/keystore.jks
ssl.keystore.password=abcd1234
ssl.key.password=abcd1234
Command: bin/kafka-console-consumer.sh --bootstrap-server 'server details' --topic topic_name --consumer.config client.properties --group group-id
但我无法使用相同的数据从 python 或 spark 连接
consumer = KafkaConsumer(topic,bootstrap_servers=bootstrap_server,security_protocol='SSL',sasl_mechanism='PLAIN',ssl_certfile='certificate.pem',ssl_keyfile='pk.key')
我尝试在上面的代码中更改多个选项,例如添加 check_host_name 等,但没有运气。 kafka 不属于我们的团队,由不同的团队管理它,当我们请求访问时,我们会获得一个私钥和证书以及 CA 包和 ARN 名称。
从 Spark(Python),我尝试了下面的代码
sdf1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers",bootstrap_server)
.option("subscribe", topic_name)
.option("startingOffsets", "latest")
.option("kafka.security.protocol","SSL")
.option("kafka.ssl.keystore.location",'keystore.jks')
.option("kafka.ssl.keystore.password", '****')
.option("kafka.ssl.key.password",'****')
.load()
我收到类似“org.apache.kafka.common.errors.GroupAuthorizationException:未授权访问组:spark-kafka-source-xxxxxxx-xxxxx-xxxxx”的错误
上述错误可能与 spark 每次访问时生成唯一组 id 有关。只有在 spark 3.0 及更高版本中才允许在 spark 数据框中使用 group-id。我需要在 spark 2.4.4 中解决这个问题。
任何建议将不胜感激。
【问题讨论】:
标签: python apache-spark ssl pyspark apache-kafka