【发布时间】:2020-11-22 06:45:59
【问题描述】:
我们正在尝试使用来自其他系统的 AVRO 消息。 当我使用以下代码将架构指定为文件 (.avsc) 时,我能够读取 AVRO 消息,
import avro.schema
from avro.io import DatumReader, BinaryDecoder
...
schema = avro.schema.Parse(open("schema.avsc", "rb").read())
...
bytes_reader = io.BytesIO(element) # element is the serialized message
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(schema)
rec = reader.read(decoder)
但是,我现在需要从架构注册表 URL 读取架构,
http://<IP>:<PORT>/subjects/<SUBJECT>/versions/<VERSION>/schema
我正在从传入消息自定义属性“模式”中提取 url。现在要从我使用以下代码的 url 获取架构,
def fetch_schema(IP, subject, version):
sr = SchemaRegistryClient(IP)
schema = sr.get_schema(subject, version=version).schema
return schema
使用上面用于反序列化消息的相同代码,我现在得到以下错误
AttributeError: 'AvroSchema' object has no attribute 'type'
上线,
rec = reader.read(decoder)
我比较了从文件读取和从 URL 获取时的“模式”变量的类型,
from file, the schema type is : <class 'avro.schema.RecordSchema'>
from URL, the schema type is : <class 'schema_registry.client.schema.AvroSchema'>
它们是不同的,因此可能是问题所在。在这里寻找一些方向。谢谢!
【问题讨论】:
标签: python schema avro confluent-schema-registry