【问题标题】:How to post a kafka schema using python如何使用 python 发布 kafka 模式
【发布时间】:2021-03-19 06:06:50
【问题描述】:

我正在尝试使用 python 发布一个 kafka 架构。

在 CLI 中,我会使用如下语法:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"VisualDetections\",\"namespace\":\"com.namespace.something\",\"fields\":[{\"name\":\"vehicle_id\",\"type\":\"int\"},{\"name\":\"source\",\"type\":\"string\"},{\"name\":\"width\",\"type\":\"int\"},{\"name\":\"height\",\"type\":\"int\"},{\"name\":\"annotated_frame\",\"type\":[\"string\",\"null\"]},{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"fps\",\"type\":\"int\"},{\"name\":\"mission_id\",\"type\":\"int\"},{\"name\":\"sequence\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"sequence_record\",\"fields\":[{\"name\":\"frame_id\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"localization\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"localization_record\",\"fields\":[{\"name\":\"latitude\",\"type\":\"double\"},{\"name\":\"longitude\",\"type\":\"double\"},{\"name\":\"class\",\"type\":\"string\"},{\"name\":\"object_id\",\"type\":\"int\"},{\"name\":\"confidence\",\"type\":\"double\"},{\"name\":\"bbox\",\"type\":{\"type\":\"record\",\"name\":\"bbox\",\"fields\":[{\"name\":\"x_min\",\"type\":\"int\"},{\"name\":\"y_min\",\"type\":\"int\"},{\"name\":\"x_max\",\"type\":\"int\"},{\"name\":\"y_max\",\"type\":\"int\"}]}}]}}}]}}}]}"}' http://server_ip:8081/subjects/VisualDetections-value/versions/

当我尝试将此函数转移到 python 时,我尝试了类似的方法:

import requests
import json

topic = 'VisualDetections'
headers = {'Content-Type':  'application/vnd.schemaregistry.v1+json'}
with open(avro_path) as fp:
     data = {'schema': json.load(fp)}
data_json = json.dumps(data)
cmd = 'http://server_ip:8081/subjects/{}-value/versions/'.format(topic)
response = requests.post(cmd, headers=headers, data=data_json)

以上返回代码{"error_code":500,"message":"Internal Server Error"}。我尝试过其他选项,例如:

with open(avro_path) as fp:
    data = json.load(fp)

带有错误代码:

"error_code":422,"message":"Unrecognized field: name"
    

在上面的avro_path 只是在一个 json 文件中包含了 avro 模式(如果有用也可以上传)。

我不确定如何准确发布这些数据。另外,我没有考虑 CLI 中 post 的 -H 参数,因为我找不到等效的 python 参数(但不确定它是否起任何作用)。谁能提供这个问题的解决方案。

【问题讨论】:

  • headers=headers-H 相同...但是如果可以的话,您可能需要查看服务器日志,以查看您发送的数据的确切错误500 错误
  • curl 中的-H 选项只定义了一个标头,在本例中为Content-Type: application/vnd.schemaregistry.v1+json。您已经在使用 .

标签: python rest apache-kafka confluent-schema-registry


【解决方案1】:

对于第二个错误,payload 需要是{'schema': "schema string"}

首先,我认为这是编码的问题; json.load 会将文件读入字典而不是字符串。

通知

>>> import json
>>> schema = {"type":"record"}  # example when using json.load() ... other data excluded
>>> json.dumps({'schema': schema})
'{"schema": {"type": "record"}}'  # the schema value is not a string
>>> json.dumps({'schema': json.dumps(schema)})
'{"schema": "{\\"type\\": \\"record\\"}"}'  # here it is

尝试只读取文件

url = 'http://server_ip:8081/subjects/{}-value/versions/'.format(topic)
with open(avro_path) as fp:
     data = {'schema': fp.read().strip()}
     response = requests.post(cmd, headers=headers, data=json.dumps(data))

否则,您将json.load 然后使用json.dumps 两次,如上所示

你也可以试试json=data而不是data=json.dumps(data)

【讨论】:

    猜你喜欢
    • 2019-12-17
    • 1970-01-01
    • 2020-01-02
    • 1970-01-01
    • 1970-01-01
    • 2017-01-24
    • 2019-02-03
    • 2017-12-21
    • 1970-01-01
    相关资源
    最近更新 更多