使用org.json.XML 库将XML 数据转换为JSON。
检查下面的代码。
创建UDF
scala> import org.json.XML
import org.json.XML
scala> val parse = udf((value: String) => XML.toJSONObject(value).toString) // Defined UDF to parse xml to json
parse: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
根据XML 数据定义schema。
scala> val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""" // Define Schema of your xml data in json.
schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}
scala> val schema = DataType.fromJson(schema_json).asInstanceOf[StructType] // Convert Json schema data to schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true), StructField(division,StringType,true), StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),true)),true))
最终Schema
scala>
inputStream
.selectExpr("CAST(value AS STRING)")
.select(from_json(parse($"data"),schema).as("emp_data"))
.select($"emp_data.employees.employee.*")
.printSchema
root
|-- building: long (nullable = true)
|-- division: string (nullable = true)
|-- firstname: string (nullable = true)
|-- id: string (nullable = true)
|-- lastname: string (nullable = true)
|-- room: long (nullable = true)
|-- supervisor: string (nullable = true)
|-- title: string (nullable = true)
将转换为JSON的数据写入console。
scala>
inputStream
.selectExpr("CAST(value AS STRING)")
.select(from_json(parse($"data"),schema).as("emp_data"))
.select($"emp_data.employees.employee.*")
.writeStream
.format("console")
.option("truncate", false)
.outputMode("append")
.start()
.awaitTermination()