【发布时间】:2018-03-23 09:56:43
【问题描述】:
我可以访问 Apache Kafka 集群,并且获得了一个描述消息的 Apache Avro 序列化格式的文件。我正在用 python 编写一个小型测试使用者,尝试解析架构时出现以下错误:
SchemaParseException: Type property "{u'items': u'com.myapp.avromsg.common.MilestoneField', u'type': u'array'}" not a valid Avro schema: Items schema (com.myapp.avromsg.common.MilestoneField) not a valid Avro schema: Could not make an Avro Schema object from com.myapp.avromsg.common.MilestoneField. (known names: [u'com.myapp.avromsg.runstatus.RunStatusMessage'])
在我看来,错误来自不了解自定义字段类型 MilestoneField。我将如何将这个字段描述到我的脚本中,以便正确解析序列化格式?
这是my_msg.avsc avro 文件:
{
"type": "record",
"name": "RunStatusMessage",
"namespace": "com.myapp.avromsg.runstatus",
"fields": [
{
"name": "datasetID",
"type": "string"
},
{
"name": "runID",
"type": ["string", "null"]
},
{
"name": "registryRunID",
"type": ["string", "null"]
},
{
"name": "status",
"type": "string"
},
{
"name": "logs",
"type": ["string", "null"]
},
{
"name": "jobID",
"type": ["string", "null"]
},
{
"name": "validationsJson",
"type": ["string", "null"]
},
{
"name": "zone",
"type": "string"
},
{
"name": "milestoneFields",
"type": {
"type": "array",
"items": "com.myapp.avromsg.common.MilestoneField"
}
},
{
"name": "ingestionParams",
"type": {
"type": "array",
"items": "com.myapp.avromsg.common.MilestoneField"
},
"default": []
},
{
"name": "timestamp",
"type": [
{
"type": "long",
"logicalType": "timestamp-millis"
},
{
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 0
},
"string",
"int",
"null"
]
}
]
}
这是我目前使用的代码:
import avro.schema
schema = avro.schema.parse(open('my_msg.avsc', 'rb').read())
【问题讨论】:
-
你在哪里定义 MilestoneField 对象?
标签: python apache-kafka avro