【问题标题】:Using 'SchemaRegistryClient' to deserialize AVRO message in Python在 Python 中使用“SchemaRegistryClient”反序列化 AVRO 消息
【发布时间】:2020-11-22 06:45:59
【问题描述】:

我们正在尝试使用来自其他系统的 AVRO 消息。 当我使用以下代码将架构指定为文件 (.avsc) 时,我能够读取 AVRO 消息,

import avro.schema
from avro.io import DatumReader, BinaryDecoder
...
schema = avro.schema.Parse(open("schema.avsc", "rb").read())
...
bytes_reader = io.BytesIO(element) # element is the serialized message
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(schema)
rec = reader.read(decoder)

但是,我现在需要从架构注册表 URL 读取架构,

http://<IP>:<PORT>/subjects/<SUBJECT>/versions/<VERSION>/schema

我正在从传入消息自定义属性“模式”中提取 url。现在要从我使用以下代码的 url 获取架构,

def fetch_schema(IP, subject, version):
    sr = SchemaRegistryClient(IP)
    schema = sr.get_schema(subject, version=version).schema
    return schema

使用上面用于反序列化消息的相同代码,我现在得到以下错误

AttributeError: 'AvroSchema' object has no attribute 'type' 

上线,

rec = reader.read(decoder) 

我比较了从文件读取和从 URL 获取时的“模式”变量的类型,

from file, the schema type is : <class 'avro.schema.RecordSchema'>
from URL, the schema type is : <class 'schema_registry.client.schema.AvroSchema'>

它们是不同的,因此可能是问题所在。在这里寻找一些方向。谢谢!

【问题讨论】:

    标签: python schema avro confluent-schema-registry


    【解决方案1】:

    今天我从avro.schema.RecordSchema 转换为schema_registry.client.schema.AvroSchema 时遇到了同样的问题。 一种可能的解决方案是转储到 JSON,然后使用 Avro 库对其进行解析。

    import avro.schema
    from schema_registry.client import SchemaRegistryClient
    
    client = SchemaRegistryClient(url="localhost:8081")
    test_table_schema = client.get_schema("table_0_schema").schema
    
    avro_schema = avro.schema.parse(json.dumps(test_table_schema.schema.raw_schema))
    
    reader = DatumReader(avro_schema)
    

    警告:

    在使用 KafkaAvro 时,还可能发生其他几个问题:

    • 刷新非 avro 消息的主题。我在解码以 json 格式推送的 Avro 消息时浪费了很多时间,因为 Kafka 消费者有 auto_offset_reset='earliest' 设置。
    • 使用 Confluent 版本的 Debezium 时,可能有 5 个字节专用于架构 ID。您的解码函数应如下所示:
    def decode(msg_value):
        message_bytes = io.BytesIO(msg_value)
        message_bytes.seek(5) # <-----
        decoder = BinaryDecoder(message_bytes)
        event_dict = reader.read(decoder)
        return event_dict
    

    查看this答案了解更多信息。

    【讨论】:

      【解决方案2】:

      看来您需要从 Schema Registry API 调用中获取 JSON 表示,然后您可以像以前一样使用avro.schema.Parse

      话虽如此,你可以只使用 urllib 或 requests,而你不需要 SchemaRegistryClient

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-07-12
        • 1970-01-01
        • 2019-04-11
        • 2019-11-25
        • 2021-09-06
        • 2020-08-29
        • 2021-03-31
        • 1970-01-01
        相关资源
        最近更新 更多