【问题标题】:Array of JSON to Dataframe in Spark received by KafkaKafka 接收到的 Spark 中的 JSON 到 Dataframe 的数组
【发布时间】:2019-05-16 23:23:48
【问题描述】:

我正在使用 Spark Structured Streaming 在 Scala 中编写一个 Spark 应用程序,该应用程序从 Kafka 接收一些以 JSON 样式格式化的数据。此应用程序可以接收以这种方式格式化的单个或多个 JSON 对象:

[{"key1":"value1","key2":"value2"},{"key1":"value1","key2":"value2"},...,{"key1":"value1","key2":"value2"}]

我试图定义一个 StructType 像:

var schema = StructType(
                  Array(
                        StructField("key1",DataTypes.StringType),
                        StructField("key2",DataTypes.StringType)
             ))

但它不起作用。 我解析 JSON 的实际代码:

var data = (this.stream).getStreamer().load()
  .selectExpr("CAST (value AS STRING) as json")
  .select(from_json($"json",schema=schema).as("data"))

我想在类似的数据框中获取此 JSON 对象

+----------+---------+
|      key1|     key2|
+----------+---------+
|    value1|   value2|
|    value1|   value2|
        ........
|    value1|   value2|
+----------+---------+

有人可以帮帮我吗? 谢谢!

【问题讨论】:

  • 在转换为 JSON 之前,爆炸你的数组,它应该可以工作。
  • 参考此链接,stackoverflow.com/questions/48361177/… 我无法发表评论,因为我还没有到达那里..
  • @Sc0rpion,架构始终相同。结构是问题
  • @vindev 我试过了,还是不行
  • @Vinc 你能分享你的尝试吗?你遇到了什么错误?

标签: json scala apache-spark spark-streaming-kafka


【解决方案1】:
  1. 您可以将 ArrayType 添加到您的架构中,而 from_json 将 将数据转换为 json。
var schema = ArrayType(StructType(
                  Array(
                        StructField("key1", DataTypes.StringType),
                        StructField("key2", DataTypes.StringType)
             )))
  1. 展开得到每一行的json数组元素。
val explodedDf = df.withColumn("jsonData", explode(from_json(col("value"), schema)))
.select($"jsonData").show
+----------------+
|        jsonData|
+----------------+
|[value1, value2]|
|[value3, value4]|
+----------------+
  1. 选择 json 键
explodedDf.select("jsonData.*").show
+------+------+
|  key1|  key2|
+------+------+
|value1|value2|
|value3|value4|
+------+------+

【讨论】:

    【解决方案2】:

    这对我来说在 Spark 3.0.0 和 Scala 2.12.10 中运行良好。我使用 schema_of_json 以适合 from_json 的格式获取数据的架构,并在链的最后一步应用了 explode 和 * 运算符以进行相应的扩展。

    // TO KNOW THE SCHEMA
    scala> val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")
    str: Seq[String] = List([{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}])
    
    scala> val df = str.toDF("json")
    df: org.apache.spark.sql.DataFrame = [json: string]
    
    scala> df.show()
    +--------------------+
    |                json|
    +--------------------+
    |[{"key1":"value1"...|
    +--------------------+
    
    scala> val schema = df.select(schema_of_json(df.select(col("json")).first.getString(0))).as[String].first
    schema: String = array<struct<key1:string,key2:string>>
    

    使用生成的字符串作为您的架构:'array',如下所示:

    // TO PARSE THE ARRAY OF JSON's
    scala> val parsedJson1 = df.selectExpr("from_json(json, 'array<struct<key1:string,key2:string>>') as parsed_json")
    parsedJson1: org.apache.spark.sql.DataFrame = [parsed_json: array<struct<key1:string,key2:string>>]
    
    scala> parsedJson1.show()
    +--------------------+
    |         parsed_json|
    +--------------------+
    |[[value1, value2]...|
    +--------------------+
    
    scala> val data = parsedJson1.selectExpr("explode(parsed_json) as json").select("json.*")
    data: org.apache.spark.sql.DataFrame = [key1: string, key2: string]
    
    scala> data.show()
    +------+------+
    |  key1|  key2|
    +------+------+
    |value1|value2|
    |value3|value4|
    +------+------+
    

    仅供参考,没有星形扩展,中间结果如下所示:

    scala> val data = parsedJson1.selectExpr("explode(parsed_json) as json")
    data: org.apache.spark.sql.DataFrame = [json: struct<key1: string, key2: string>]
    
    scala> data.show()
    +----------------+
    |            json|
    +----------------+
    |[value1, value2]|
    |[value3, value4]|
    +----------------+
    

    【讨论】:

      【解决方案3】:

      由于您的传入字符串是JSONArray,一种方法是编写UDF 来解析Array,然后分解解析的Array。以下是解释每个步骤的完整代码。我已经为批处理编写了它,但同样可以用于流式传输,只需极少的更改。

      object JsonParser{
      
        //case class to parse the incoming JSON String
        case class JSON(key1: String, key2: String)
      
        def main(args: Array[String]): Unit = {
          val spark = SparkSession.
            builder().
            appName("JSON").
            master("local").
            getOrCreate()
      
          import spark.implicits._
          import org.apache.spark.sql.functions._
      
          //sample JSON array String coming from kafka
          val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")
      
          //UDF to parse JSON array String
          val jsonConverter = udf { jsonString: String =>
            val mapper = new ObjectMapper()
            mapper.registerModule(DefaultScalaModule)
            mapper.readValue(jsonString, classOf[Array[JSON]])
          }
      
          val df = str.toDF("json") //json String column
            .withColumn("array", jsonConverter($"json")) //parse the JSON Array
            .withColumn("json", explode($"array")) //explode the Array
            .drop("array") //drop unwanted columns
            .select("json.*") //explode the JSON to separate columns
      
          //display the DF
          df.show()
          //+------+------+
          //|  key1|  key2|
          //+------+------+
          //|value1|value2|
          //|value3|value4|
          //+------+------+
      
        }
      }
      

      【讨论】:

        猜你喜欢
        • 2017-12-21
        • 1970-01-01
        • 2020-11-08
        • 1970-01-01
        • 2021-02-21
        • 2018-10-05
        • 2015-09-13
        • 2020-01-15
        • 2017-10-18
        相关资源
        最近更新 更多