【发布时间】:2021-11-22 01:35:27
【问题描述】:
pydantic中有一个DocumentSchema类的嵌套规则,用FastApi写成如下:
class DocumentSchema(BaseModel):
clientName: str
transactionId: str
documentList: List[SingleDocumentSchema]
和
class SingleDocumentSchema(BaseModel):
documentInfo: DocumentInfoSchema
articleList: List[DocumentArticleSchema]
和
class DocumentInfoSchema(BaseModel):
title: str
type: str
referenceId: int
batchNoList: Optional[List]
otherData: Optional[Json]
和
class DocumentArticleSchema(BaseModel):
type: str
value: int
accountType: Optional[AccountTypeEnums]
accountId: Optional[int]
otherData: Optional[Json]
这是python代码的sn-ps,它从Kafka接收消息并对其进行处理:
def process(self) -> bool:
try:
DocumentSchema(
**json.loads(self._message)
)
return self._process()
except ValidationError as e:
raise UnprocessableEntityException(e, self._topic)
except ValueError as e:
raise UnprocessableEntityException(e, self._topic)
except Exception as e:
raise UnprocessableEntityException(e, self._topic)
但为了输入
{
"clientName": "amazon",
"transactionId": "e3e60ca3-7eb1-4a55-ae35-c43f9b2ea3fd",
"documentList": [
{
"documentInfo": {
"title": "New Order",
"type": "order",
"referenceId": 19488682
},
"articleList": [
{
"type": "product_price",
"value": 1350,
"otherData": {
"weight": "4 kg"
}
}
]
}
]
}
它报告验证错误
{"message":"DocumentSchema 的1 个验证错误\ndocumentList -> 0 -> articleList -> 0 -> otherData\n JSON 对象必须是str、bytes 或bytearray (type=type_error.json)"}
我应该提一下,没有OtherData 一切都很好。
我不知道如何解决它。
提前致谢。
【问题讨论】:
标签: python json fastapi pydantic