【问题标题】:Json file data into kafka topicJson文件数据导入kafka主题
【发布时间】:2021-07-06 06:09:50
【问题描述】:

如何使用 kafka-console-producer 将 json 文件数据插入 kafka 主题? 每个json数据集可以存储为消息吗?

例子-

{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}

如果你使用这个命令-

cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic  stream-test-topic

每一行都被视为一个单独的消息。

这样做的正确方法是什么?

谢谢!

ps-

Elastic 搜索正在读取该主题。示例 json 消息文件 -

[{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}, {
  "id": 2,
  "first_name": "Peter",
  "last_name": "Friz",
  "email": "Friz3@gmail.com",
  "gender": "Male",
  "ip_address": "4.5.6.7"
}, {
  "id": 3,
  "first_name": "Dell",
  "last_name": "Chang",
  "email": "Dellc@gmail.com",
  "gender": "Female",
  "ip_address": "8.9.10.11"
}, {
  "id": 4,
  "first_name": "Lolita",
  "last_name": "John",
  "email": "LolitaJ@gmail.com",
  "gender": "Female",
  "ip_address": "12.13.14.15"
}, {
  "id": 5,
  "first_name": "Pele",
  "last_name": "Wang",
  "email": "Pele@gmail.com",
  "gender": "Male",
  "ip_address": "16.17.18.19"
}, {
  "id": 6,
  "first_name": "Rene",
  "last_name": "Charm",
  "email": "Rene3@gmail.com",
  "gender": "Male",
  "ip_address": "20.21.22.23"
}]

【问题讨论】:

  • 你的消息分隔符是什么?简单地创建一个新的 JSON 对象? JSON 来自哪里?
  • 用示例 json 文件更新了我的问题。
  • 所以您有一个包含单个 JSON 数组的文件,并且您想将该数组分解为单独的消息?对吗?
  • 和;这些数据来自哪里?几乎可以肯定,这里更好的模式是构建 JSON 数组并写入该文件的过程,而不是将其直接发送到 Kafka
  • 如果您真的想从文件中获取一些虚拟数据并将其塞入主题以进行测试,那么只需将其从数组中取出,并在每一行放置一个对象。就是这样。

标签: apache-kafka


【解决方案1】:

如果文件中有 JSON 消息,可以使用以下方式在 kafka 主题中写入:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user-timeline < samplerecords.json

Kafka 生产者使用默认的LineMessageReader 逐行读取消息。默认键和值序列化程序是StringSerializer。它不会验证是否存在正确的 json,而是将原始字符串对象视为发布到 kafka 主题。但是如果你想验证你可以在控制台生产者命令中定义下面的配置。

key.serializer
value.serializer

例子:

kafka-console-producer --broker-list localhost:9092 --topic testTopic--property value.serializer=custom.class.serialization.JsonSerializer 

在消费者方面,您可以采用类似的方法。使用 JsonDeserializer 读取数据。

【讨论】:

  • 我发现这个对我不起作用,即使我也尝试使用 key.serializer。
【解决方案2】:

从 Kafka 的角度来看,每条消息都是字节数组。 这取决于客户的应用程序(生产者、消费者等)如何处理它。 Kafka Producer、Consumer 使用 Deserializer、Serializer 将字节数组转换为业务对象(字符串、POJO)

您面临的问题是 Kafka 控制台生产者从标准输入读取消息的方式。 默认情况下,它使用LineMessageReader,它将每一行视为新消息。您可以实现自己的,或者在将 json 中的每个换行符发送到其他空格之前。

例如,您可以使用以下命令:

jq -rc . sampledata.json | kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic

【讨论】:

    【解决方案3】:

    我也是 Kafka 的新手,和你有同样的用例。经过一些研发后,我找到了一个可能对您有所帮助的简短答案。您可以编写如下内容:

    bin/kafka-console-producer --broker-list localhost:9092 --topic blogpost
    {"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}.
    

    更详细的查看click here

    【讨论】:

      【解决方案4】:

      这个答案对于那些正在寻找kafka控制台生产者发送key和value为json的json数据的人会有所帮助。

      命令

      ./bin/kafka-console-producer.sh --broker-list localhost:9092 --property parse.key=true --property key.separator="&" --topic test-topic
      

      JSON

      {"key":"1"}&{"name":"emp1","sent_at":1625519962875}
      

      【讨论】:

        【解决方案5】:

        您可以通过管道将 Json 传递给主题:

        echo '{"test": 1}' | bin/kafka-console-producer --broker-list localhost:9092 --topic test-topic
        

        【讨论】:

          猜你喜欢
          • 2020-09-14
          • 2019-07-30
          • 2018-05-07
          • 2022-01-03
          • 1970-01-01
          • 2019-07-25
          • 2018-09-16
          • 2021-10-21
          • 2018-12-23
          相关资源
          最近更新 更多