【问题标题】:KafkaSource connection to Confluent Kafka (with SSL & SchemaRegistry)KafkaSource 连接到 Confluent Kafka(使用 SSL 和 SchemaRegistry)
【发布时间】:2022-11-21 18:41:51
【问题描述】:

我尝试使用 KafkaSource(来自 MLRun)连接到 Confluent Kafka,我以前使用过这个简单的代码:

# code with usage 'kafka-python>=2.0.2'
from kafka import KafkaProducer, KafkaConsumer

consumer = KafkaConsumer(
    'ak47-data.v1',
    bootstrap_servers =[
        'cpkafka01.eu.prod:9092', 
        'cpkafka02.eu.prod:9092', 
        'cpkafka03.eu.prod:9092'
    ],
    client_id='test',
    auto_offset_reset='earliest',
    sasl_mechanism="SCRAM-SHA-256",
    sasl_plain_password="***********",
    sasl_plain_username="***********",
    security_protocol='SASL_SSL',
    ssl_cafile="/v3io/bigdata/rootca.crt",
    ssl_certfile=None,
    ssl_keyfile=None)

# print first topic
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key, message.value))
    break

如何使用 KafkaSource 重写这段代码?

【问题讨论】:

  • 您使用哪个 MLRun 版本?
  • 我使用的是最后一个版本 MLRun>=1.1.2(还有 1.2.0-rc13)

标签: python confluent-kafka-python mlrun


【解决方案1】:

让我分享一下 KafkaSource 的函数代码(对于 MLRun>=1.1.0)。您还可以指定证书(参见 rootca.crt)和 kafka 主题列表。

from mlrun.datastore.sources import KafkaSource

# certificate
with open('/v3io/bigdata/rootca.crt') as x: 
    caCert = x.read()

# definition of KafkaSource
kafka_source = KafkaSource(
    brokers=['cpkafka01.eu.prod:9092', 
    'cpkafka02.eu.prod:9092', 
    'cpkafka03.eu.prod:9092'],
    topics=["ak47-data.v1"],
    initial_offset="earliest",
    group="test",
    attributes={"sasl" : {
                  "enable": True,
                  "password" : "******",
                  "user" : "*******",
                  "handshake" : True,
                  "mechanism" : "SCRAM-SHA-256"},
                "tls" : {
                  "enable": True,
                  "insecureSkipVerify" : False
                },            
               "caCert" : caCert}
)

【讨论】:

    猜你喜欢
    • 2022-12-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-05
    • 1970-01-01
    • 2017-08-30
    • 2020-06-20
    相关资源
    最近更新 更多