【问题标题】:Convert Streaming XML into JSON in Spark在 Spark 中将流式 XML 转换为 JSON
【发布时间】:2020-10-04 08:27:06
【问题描述】:

我是 Spark 的新手,正在开发一个简单的应用程序,用于将从 Kafka 接收的 XML 流转换为 JSON 格式

使用:

  • 火花 2.4.5
  • Scala 2.11.12

在我的用例中,kafka 流是 xml 格式)。以下是我尝试过的代码。


    val spark: SparkSession = SparkSession.builder()
      .master("local")
      .appName("Spark Demo")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val inputStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "demo_topic_xml")
      .option("startingOffsets", "earliest") // From starting
      .load()

    inputStream.printSchema()


    val records = inputStream.selectExpr("CAST(value AS STRING)")
    //How to remove value column here while converting xml in to json?
    val jsons = records.toJSON

    jsons.writeStream
      .format("console")
      .option("truncate", false)
      .outputMode("append")
      .start()
      .awaitTermination()

然而,上面的代码在 json 输出中给出了“value”列标题作为字段名称,如下所示: {"value":"<?xml version=\"1.0\" encoding=\"utf-16\"?><employees><employee id=\"be129\"><firstname>Jane</firstname><lastname>Doe</lastname><title>Engineer</title><division>Materials</division><building>327</building><room>19</room><supervisor>be131</supervisor></employee><employees>"}

我真正需要的只是将 xml 有效负载转换为没有“值”列部分的 json。看起来我在这里遗漏了一些明显的东西。有人可以在这里帮助我吗?感谢您的宝贵时间。

【问题讨论】:

  • 以下解决方案有效??

标签: scala apache-spark spark-streaming


【解决方案1】:

使用org.json.XML 库将XML 数据转换为JSON

检查下面的代码。

创建UDF

scala> import org.json.XML
import org.json.XML

scala> val parse = udf((value: String) => XML.toJSONObject(value).toString) // Defined UDF to parse xml to json
parse: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

根据XML 数据定义schema

scala> val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""" // Define Schema of your xml data in json.
schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}

scala> val schema = DataType.fromJson(schema_json).asInstanceOf[StructType] // Convert Json schema data to schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true), StructField(division,StringType,true), StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),true)),true))

最终Schema

scala>
    inputStream
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(parse($"data"),schema).as("emp_data"))
    .select($"emp_data.employees.employee.*")
    .printSchema

root
 |-- building: long (nullable = true)
 |-- division: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- room: long (nullable = true)
 |-- supervisor: string (nullable = true)
 |-- title: string (nullable = true)

将转换为JSON的数据写入console

scala> 
    inputStream
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(parse($"data"),schema).as("emp_data"))
    .select($"emp_data.employees.employee.*")
    .writeStream
    .format("console")
    .option("truncate", false)
    .outputMode("append")
    .start()
    .awaitTermination()

【讨论】:

    猜你喜欢
    • 2018-03-28
    • 2019-07-12
    • 2011-07-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-07-16
    相关资源
    最近更新 更多