您可以尝试为json 文件构建一个架构。
我不知道你期望什么输出。
作为线索,我给你一个例子和两个有趣的链接:
spark-read-json-with-schema
spark-schema-explained-with-examples
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructType}
object RareJson {
val spark = SparkSession
.builder()
.appName("RareJson")
.master("local[*]")
.config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id","RareJson") // To silence Metrics warning
.getOrCreate()
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
val input = "/home/cloudera/files/tests/rare.json"
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.ERROR)
try {
val structureSchema = new StructType()
.add("field1",StringType)
.add("field2",StringType)
.add("value",StringType,true)
val rareJson = sqlContext
.read
.option("allowBackslashEscapingAnyCharacter", true)
.option("allowUnquotedFieldNames", true)
.option("multiLine", true)
.option("mode", "DROPMALFORMED")
.schema(structureSchema)
.json(input)
rareJson.show(truncate = false)
// To have the opportunity to view the web console of Spark: http://localhost:4041/
println("Type whatever to the console to exit......")
scala.io.StdIn.readLine()
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}
输出
+------+------+---------------------------+
|field1|field2|value |
+------+------+---------------------------+
|d1 |app |{"data":"{\"app\":\"am\"}"}|
+------+------+---------------------------+
如果 value 列在所有行中保持相同的格式,您也可以尝试解析它。