【问题标题】:How do I send JSON data to kafka using confluent's REST proxy?如何使用 confluent 的 REST 代理将 JSON 数据发送到 kafka?
【发布时间】:2019-02-08 10:54:10
【问题描述】:

对于我的学士学位论文,我正在尝试使用与 kafka 的 http 连接来发送机器数据(在本例中是使用 python 脚本发送的历史数据)。 我正在使用在 windows 系统上运行在 docker 中的融合平台。

我尝试使用 python 脚本将数据发送到 REST 代理。起初,我收到了有关我能够解决的数据类型的错误响应。

import pandas as pd
import csv, os, json, requests, time, datetime, copy, sys

if len(sys.argv) > 1:
    bgrfc_value = str(sys.argv[1])
else:
    print("No arguments for bgrfc given, defaulting to 'false'")
    bgrfc_value = 'false'

if len(sys.argv) > 2:
    filePath = str(sys.argv[2])
else:
    filePath = "path"


if len(sys.argv) > 3:
    batchSize = int(float(str(sys.argv[3])))
else:
    batchSize = 10


# Build skeleton JSON
basejson = {"message": {"meta" : "", "data": ""}}
#metajson = [{'meta_key' : 'sender', 'meta_value': 'OPCR'},
#           {'meta_key' : 'receiver', 'meta_value': 'CAT'},
#            {'meta_key' : 'message_type', 'meta_value': 'MA1SEK'},
#            {'meta_key' : 'bgrfc', 'meta_value': bgrfc_value}]
#basejson['message']['meta'] = metajson
url = "http://127.0.0.1:8082/"
headers = {'Content-Type':'application/json','Accept':'application/json'}

def assign_timestamps(batch):
    newtimestamps = []
    oldtimestamps = []

    # Batch timestamps to list, add 10 newly generated timestamps to a list
    for item in batch['tag_tsp'].values.tolist():
        newtimestamps.append(datetime.datetime.now())
        oldtimestamps.append(datetime.datetime.strptime(str(item), "%Y%m%d%H%M%S.%f"))

    # Sort old timestamps without sorting the original array to preserve variance
    temp = copy.deepcopy(oldtimestamps)
    temp.sort()
    mrtimestamp = temp[0]

    # Replicate variance of old timestamps into the new timestamps
    for x in range(batchSize):
        diff = mrtimestamp - oldtimestamps[x]
        newtimestamps[x] = newtimestamps[x] - diff
        newtimestamps[x] = newtimestamps[x].strftime("%Y%m%d%H%M%S.%f")[:-3]

    # Switch old timestamps with new timestamps
    batch['tag_tsp'] = newtimestamps
    return batch

# Build and send JSON, wait for a sec
def build_json(batch):
    assign_timestamps(batch)
    batchlist = []
    for index, row in batch.iterrows():
        batchlist.append(row.to_dict())

    basejson['message']['data'] = batchlist
    print(basejson)
    req = requests.post(url, json = json.loads(json.dumps(basejson)), headers = headers)
    print(req.status_code)
    time.sleep(1)

while(True):
    df = pd.read_csv(filePath, sep=";", parse_dates=[2], decimal=",", usecols = ['SENSOR_ID', 'KEP_UTC_TIME', 'VALUE'], dtype={'SENSOR_ID': object})
    df = df[::-1]
    df.rename(columns={'SENSOR_ID' : 'ext_id', 'KEP_UTC_TIME' : 'tag_tsp', 'VALUE' : 'tag_value_int'}, inplace=True)

    # Fill list with batches of 10 rows from the df
    list_df = [df[ i:i + batchSize] for i in range(0, df.shape[0], batchSize)]

    for batch in list_df:
        build_json(batch)

脚本发送数据,但作为响应,我得到状态码 500。

【问题讨论】:

  • 你知道Kafka有Python客户端库吗?
  • 我听说有用于 kafka 的 python 库,但我还没有将它们视为我的问题的解决方案,因为我的论文任务的定义是数据是通过 http 连接提供的。据我了解,kafka 的 python 库不会帮助我解决这个问题。如果我错了,请告诉我。
  • Python 库将使用原生 Kafka 协议,该协议比 HTTP 更高效。如果您必须使用 HTTP,请坚持使用 REST 代理。

标签: python json apache-kafka confluent-platform kafka-rest


【解决方案1】:

您的标头值不正确。您需要设置AcceptContent-type 两个标头,如下所示:

 Accept: application/vnd.kafka.v2+json
 Content-Type : application/vnd.kafka.json.v2+json

此外,数据应按以下方式结构化:

{"records":[{"value":{<Put your json record here>}}]}

例如:

{"records":[{"value":{"foo":"bar"}}]}

【讨论】:

  • 感谢您的回复。我已更改标题并尝试使用不同的脚本发送示例消息。控制台输出是 {'records': [{'key': 'somekey', 'value': {'foo': 'bar'}}, {'value': ['foo', 'bar'], 'partition': 1}, {'value': 53.5}]} 500 消息是从 confluents 文档中复制和粘贴的。知道为什么错误 500 仍然出现吗?
  • @LukasM 500 表示Internal Server Error,您的服务器端出了点问题,而不是您的客户端。您需要检查来自 REST 代理的日志
【解决方案2】:

我相信你放入“value”的数据一定是字符串。 这样的事情会起作用:

{"records":[{"value":"{'foo':'bar'}"}]}

如果您在阅读主题时收到一条有趣的消息,请尝试使用 base64 编码对您的消息进行编码。你原来的 json 字符串,编码后应该是这样的:

{"records":[{"value":"eyJmb28iOiJiYXIifQ=="}]}

【讨论】:

    猜你喜欢
    • 2022-11-10
    • 2018-11-28
    • 1970-01-01
    • 2020-06-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-17
    相关资源
    最近更新 更多