【问题标题】:Avro Producer sending key without key schemaAvro Producer 发送没有密钥模式的密钥
【发布时间】:2020-12-14 05:58:59
【问题描述】:

我在 Python 2.7 中使用 Avro Producer。我需要发送带有键和值的消息, 该值在主题中有 Avro-Schema,但键没有 Avro-Schema(我无法为键添加 Schema - 遗留原因)。

这是我的代码:

def main():
    kafkaBrokers = os.environ.get('KAFKA_BROKERS')
    schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
    topic = os.environ.get('KAFKA_TOPIC')

    subject = '${}-value'.format(topic)
    sr = CachedSchemaRegistryClient(schemaRegistry)

    schema = sr.get_latest_schema(subject).schema

    value_schema = avro.loads(str(schema))

    url = 'test.com'

    value = {'url': u'test.com', 'priority': 10}

    avroProducer = AvroProducer({
        'bootstrap.servers': kafkaBrokers,
        'schema.registry.url': schemaRegistry
    }, default_value_schema=value_schema)


    key = 1638895406382020875
    
    avroProducer.produce(topic=topic, value=value, key=key)
    avroProducer.flush()

我收到以下错误:

raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key

如果我从生产函数中删除密钥:

avroProducer.produce(topic=topic, value=value)

有效。

如何在没有架构的情况下发送密钥?

【问题讨论】:

    标签: python apache-kafka avro kafka-producer-api


    【解决方案1】:

    AvroProducer 假定键和值都使用架构注册表进行编码,在键和值的负载前添加一个魔术字节和架构 ID。

    如果您想为密钥使用自定义序列化,您可以使用Producer 而不是AvroProducer。但是您有责任序列化密钥(使用您想要的任何格式)和值(这意味着对值进行编码并在魔术字节和模式 id 之前添加)。要了解这是如何完成的,您可以查看AvroProducer 代码。

    但这也意味着您必须编写自己的AvroConsumer,并且无法使用kafka-avro-console-consumer

    【讨论】:

    • 反序列化器参数可以传递给非 Avro 密钥的 avro 控制台消费者
    【解决方案2】:

    您需要使用常规的 Producer 并自己执行序列化函数

    from confluent_kafka import avro
    from confluent_kafka.avro import CachedSchemaRegistryClient
    from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer
    
    avro_serializer = AvroSerializer(schema_registry)
    serialize_avro = avro_serializer.encode_record_with_schema  # extract function definition 
    
    value_schema = avro.load('avro_schemas/value.avsc')  # TODO: Create avro_schemas folder 
    
    p = Producer({'bootstrap.servers': bootstrap_servers})
    
    value_payload = serialize_avro(topic, value_schema, value, is_key=False)
    p.produce(topic, key=key, value=value_payload, callback=delivery_report)
    

    【讨论】:

    • MessageSerializer 是否足以将对象序列化为 avro 序列化?
    • 是的。我只是将其重命名为import MessageSerializer as AvroSerializer
    猜你喜欢
    • 2021-03-22
    • 2018-08-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-11-18
    • 2018-03-15
    • 1970-01-01
    相关资源
    最近更新 更多