【问题标题】:Parsing Event Hub Complex Array Type messages using spark streaming使用火花流解析事件中心复杂数组类型消息
【发布时间】:2021-07-01 21:58:48
【问题描述】:

我需要在从 eventthub 读取时解析正文中的数组类型。我们嵌套了 json 消息,但无法解析相同的消息:

{"Name": "Rohit","Salary": "29292","EmpID": 12,"Projects": [{"ProjectID": "9191","ProjectName": "abc","Duration ": "79"},{"ProjectID": "9192","ProjectName": "xyz","Duration": "75"}]}

我正在尝试使用以下方式修改架构:但似乎存在一些问题。

val testSchema = new StructType()
  .add("Name", StringType)
  .add("Salary", StringType)
  .add("EmpID", StringType)      
  .add("Projects", new ArrayType(new StructType()
    .add("ProjectID", StringType)
    .add("ProjectName", StringType)
    .add("Duration", StringType)))

任何帮助将不胜感激。

【问题讨论】:

    标签: scala apache-spark databricks spark-structured-streaming azure-eventhub


    【解决方案1】:

    如果没有确切的错误很难说,但看起来您在架构定义中有错误 - 您需要修改架构以添加标志指定数组元素是否可以为空(请参阅嵌套后的 true 标志结构类型)。当直接读取 JSON 或从字符串转换它时,以下模式可以正常工作:

    val testSchema = new StructType()
      .add("Name", StringType)
      .add("Salary", StringType)
      .add("EmpID", StringType)      
      .add("Projects", new ArrayType(new StructType()
        .add("ProjectID", StringType)
        .add("ProjectName", StringType)
        .add("Duration", StringType), true))
    
    scala> spark.read.schema(testSchema).json("file.json").show(truncate=false)
    +-----+------+-----+----------------------------------+
    |Name |Salary|EmpID|Projects                          |
    +-----+------+-----+----------------------------------+
    |Rohit|29292 |12   |[{9191, abc, 79}, {9192, xyz, 75}]|
    +-----+------+-----+----------------------------------+
    
    scala> import org.apache.spark.sql.functions._
    import org.apache.spark.sql.functions._
    
    scala> val df = spark.read.text("file.json")
    df: org.apache.spark.sql.DataFrame = [value: string]
    
    scala> df.select(from_json($"value", testSchema)).show(truncate=false)
    +------------------------------------------------------+
    |from_json(value)                                      |
    +------------------------------------------------------+
    |{Rohit, 29292, 12, [{9191, abc, 79}, {9192, xyz, 75}]}|
    +------------------------------------------------------+
    

    【讨论】:

    • 感谢 Alex 非常及时的回答,架构定义有效。
    • 我有更复杂的情况,我们需要解析嵌套数组模式,然后将其展平以推入目标突触数据库。 {“姓名”:“Rohit”,“工资”:“29292”,“EmpID”:12,“项目”:[{“项目ID”:“9191”,“项目名称”:“abc”,“工期”:“ 79","Location" :[{"City" : "Delhi", "State" : "DEL"},{"City" : "BANG", "State" : "KR" }]},{"ProjectID" :“9192”,“项目名称”:“xyz”,“持续时间”:“75”,“位置”:[{“城市”:“BANG”,“州”:“KR”},{“城市”:“ CH", "State" : "TN" }]}]} 你能建议一下吗?
    • 您可以使用explode函数将数组元素转换为单独的行
    • 感谢您的回复,我在为嵌套数组定义架构时遇到错误。你能帮忙在上面的例子中定义嵌套数组的模式吗 val testSchema = new StructType() .add("Name", StringType) .add("Salary", StringType) .add("EmpID", StringType) .add( "Projects", new ArrayType(new StructType() .add("ProjectID", StringType) .add("ProjectName", StringType) .add("Duration", StringType), true) .add("Location", new ArrayType (new StructType() .add("City", StringType) .add("State", StringType), true))))
    • 问题出在这里:` .add("Duration", StringType), true) .add("Location", new ArrayType(` - 你需要在第二个右括号之前先移动, true最后...
    猜你喜欢
    • 1970-01-01
    • 2022-01-08
    • 2019-04-10
    • 2016-08-13
    • 2014-10-27
    • 1970-01-01
    • 2015-01-08
    • 2018-08-09
    • 2018-07-27
    相关资源
    最近更新 更多