【问题标题】:Process each json record in json RDD using Spark with Scala使用 Spark 和 Scala 处理 json RDD 中的每个 json 记录
【发布时间】:2018-08-19 04:37:57
【问题描述】:

我需要以下情况的帮助:

我将从 Kafka 获取以下 JSON 格式的数据以触发流式传输

{"id" : 1 , "data" : "AFGH00101219"}
{"id" : 2 , "data" : "AFGH00101215"}
{"id" : 2 , "data" : "AFGH00101216"}
{"id" : 3 , "data" : "AFGH00101218"}

val messages= KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

现在我想处理消息中的每条 JSON 记录,然后每条记录依次返回一组记录。请给我一些想法来完成以下任务。

val output = messages.map(row =>
{
//here I will get each json record. My doubt is how to extract id and data 
//filed values from row and store it into variables.
//Here I need to decode the data filed value which is in hexa decimal format 
//to decimal format.
}

提前致谢。如果问题不清楚,请告诉我。

【问题讨论】:

  • 您可以实现自己的实际将 JSON 解析为对象的方法,而不是使用 StringDecoder

标签: scala apache-spark apache-kafka spark-streaming


【解决方案1】:

您可以使用Google GSON 或任何 JSON 解析库,我曾使用如下的 Google GSON 来解析我在 spark 流中收到的 JSON 数据。

// loop each RDD 
lines.foreachRDD(rawRDD => {
    val rdd = rawRDD.filter(!_.isEmpty)
      .map(row => {
        val jobj = new Gson().fromJson(row, classOf[JsonObject])
        val id = jobj.getAsJsonObject("id").getAsString
        val data = jobj.getAsJsonObject("data").getAsString
        // Do something with id and data
      })
  })

另一种方法是从接收到的RDD创建一个Dataframe

lines.foreachRDD(rawRDD => {
  val rdd = rawRDD.filter(!_.isEmpty)
  val df = spark.read.json(rdd)
  df.show(false)
  })

这会从 rdd 创建一个数据框,如下所示,不,您可以将 id 和 data 用于任何其他转换/操作。

+------------+---+
|data        |id |
+------------+---+
|AFGH00101219|1  |
|AFGH00101215|2  |
|AFGH00101216|2  |
|AFGH00101218|3  |
+------------+---+

我希望这会有所帮助!

【讨论】:

  • 理想情况下,您不应该为每一行都创建一个新的 Gson 反序列化器
  • @cricket_007 编写我们自己的解析器?如果我们想从 json 中删除一些元素怎么办。
  • @cricket_007 是否使用自定义解析器提高了性能,我在我的项目中使用过 GSON,所以如果改进的话,一定要使用自己的。
  • 我认为您误解了我的评论。您可以扩展 Kafka 的 StringDecoder,或者实现自己的将 String 或 ByteArray 记录转换为 Java 对象或 JSONObjects。 Spark 有一个内置的 JSON 解析器,所以我不明白你为什么要使用 Gson 开头
  • 感谢您的回复。我对上述答复几乎没有疑问。我们是否需要使用 GSON。如果没有,我可以接近数据框的概念吗?但是如果我遵循数据框,我将如何从 json 中迭代每条记录,如 RDBMS 中的游标。请告诉我。
猜你喜欢
  • 1970-01-01
  • 2016-05-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-09-17
  • 1970-01-01
  • 2021-03-19
相关资源
最近更新 更多