【问题标题】:Read Json file and post to Kafka topic - What is the best approach?阅读 Json 文件并发布到 Kafka 主题 - 最好的方法是什么?
【发布时间】:2021-09-24 11:36:26
【问题描述】:

我有一组 json 文件,其内容与示例类似。我需要读取 json 文件并使用 Python 脚本将其内容发布到 Kafka 主题。有人可以用最好的方法帮助我吗?还是逐行读取文件并将其发布到 Kafka 是唯一可能的方法?

文件.json

{"L1K1": "v1", "L1K2" : "v2", "L1K3" : "v3"}

{"L2K1": "v1", "L2K2" : "v2", "L2K3" : "v3"}

{"L3K1": "v1", "L3K2" : "v2", "L3K3" : "v3"}

{"L4K1": "v1", "L4K2" : "v2", "L4K3" : "v3"}

【问题讨论】:

    标签: python apache-kafka kafka-producer-api


    【解决方案1】:

    你需要 Python 吗? kafka-console-producer ... < file.json 将为每一行生成一条新消息。

    否则,是的,你想要这样的东西

    import kafka
    
    p = kafka.Producer(...)
    
    with open('file.json') as f:
        for line in f:
            p.send('topic', value=line.strip())
    p.flush()
    

    【讨论】:

      【解决方案2】:

      使用 Kafka Connect。将数据摄取到 Kafka 正是它的设计目的,它是 Apache Kafka 的一部分。

      两个都支持提取 JSON 的连接器:

      【讨论】:

      • 我们没有 KC 设置。(以前从未使用过)。必须使用 Python KafkaProducer 来完成。此外,对于它产生的每一行(消息),我们需要跟踪数据库中的偏移量和其他详细信息。这样,我们就可以定期监控生成了多少消息、失败了多少、消息来自哪个文件等。
      • 我的意思是,如果你想重新发明轮子...... ;) Kafka Connect 可以完成所有这些事情(可靠地跟踪偏移量、摄取进度等)。要使用它,您需要运行 worker(Apache Kafka 附带的二进制文件),安装相关连接器,并使用 JSON 配置文件对其进行配置。
      猜你喜欢
      • 2017-10-18
      • 2011-01-24
      • 2020-01-04
      • 2018-05-21
      • 2018-10-28
      • 1970-01-01
      • 2017-10-19
      • 1970-01-01
      • 2010-09-13
      相关资源
      最近更新 更多