【问题标题】:Sending a JSON file with multiple JSON objects through KAFKA通过 KAFKA 发送带有多个 JSON 对象的 JSON 文件
【发布时间】:2020-01-02 21:52:50
【问题描述】:

我有一个文件,其中包含以下格式的多个 json 文档。

{"attribute1": "value1", "attribute2": "value2", "attribute3": "value3", "attribute4": "value4"} {"attribute1": "value11", "attribute2": "value12", "attribute3": "value13", "attribute4": "value14"} {"attribute1": "value21", "attribute22": "value2", "attribute23": "value3", "attribute4": "value24"}

我正在尝试将单个 json 文档发送到 kafka。 该脚本以退出代码 0 执行,但我看不到 KAFKA 消费者上没有任何消息。我不确定我哪里出错了。

我的代码如下:

import csv
import json

bootstrap = ['hostname:9092']
valueSerializer = lambda x: dumps(x).encode('utf-8')

producer = KafkaProducer(bootstrap_servers = bootstrap, value_serializer = valueSerializer)

table = []
with open('~/json_file_name.json', 'r') as json_file:
    for line in json_file:
        table.append(json.loads(line))

#numrows = len(table)
#print(numrows)

for row in table:
    print(row)
    producer.send('Topic_Name', value=row)

【问题讨论】:

    标签: python json python-3.x apache-kafka kafka-python


    【解决方案1】:

    您可能没有为生产者发送足够的数据来刷新其批次。您尚未显示 KafkaProducer 的导入,但请查看是否可以在脚本末尾执行 producer.flush()


    顺便说一句,您不需要表变量,只需在读取文件行时发送即可。你也不需要dumps(x),因为你已经在发送json.loads获得的字符串了

    您也可以删除 csv 导入

    【讨论】:

    • 我在脚本末尾添加了 producer.flush() 并将数据发送到 Kafka
    猜你喜欢
    • 1970-01-01
    • 2018-10-15
    • 1970-01-01
    • 2019-08-28
    • 2014-10-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多