【问题标题】:Spark explode nested JSON with Array in ScalaSpark 在 Scala 中使用数组爆炸嵌套 JSON
【发布时间】:2016-11-09 16:02:45
【问题描述】:

假设我通过以下方式将 json 文件加载到 Spark 1.6 中

sqlContext.read.json("/hdfs/")

它给了我一个具有以下架构的数据框:

root
 |-- id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- checked: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- color: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: array (nullable = true)
 |    |-- element: string (containsNull = true)

DF 只有一行,里面有一个包含我所有项目的数组。

+--------------------+--------------------+--------------------+
|                id_e|           checked_e|             color_e|
+--------------------+--------------------+--------------------+
|[0218797c-77a6-45...|[false, true, tru...|[null, null, null...|
+--------------------+--------------------+--------------------+

我想要一个将数组分解为每行一项的 DF。

+--------------------+-----+-------+
|                  id|color|checked|
+--------------------+-----+-------+
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
...

到目前为止,我通过从数组 DF 创建一个临时表并使用带有横向视图的 sql 来实现这一点。

val results = sqlContext.sql("
SELECT id, color, checked from temptable 
lateral view explode(checked_e) temptable as checked 
lateral view explode(id_e) temptable as id 
lateral view explode(color_e) temptable as color
")

有没有什么方法可以不使用 SQL 直接使用 Dataframe 函数实现这一点?我知道有类似 df.explode(...) 但我无法让它与我的数据一起使用

编辑:看起来爆炸并不是我真正想要的。 我想要一个新的数据框,它逐行包含数组的每个项目。与我的初始数据集相比,explode 函数实际上返回的行数更多。

【问题讨论】:

    标签: arrays json scala apache-spark explode


    【解决方案1】:

    以下解决方案应该有效。

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.functions._
    
    val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
    val df = sqlContext.createDataFrame(data)
    
    val udf3 = udf[Seq[(Int, Int, Int)], Seq[Int], Seq[Int], Seq[Int]]{
        case (a, b, c) => (a,b, c).zipped.toSeq
    }
    
    val df3 = df.select(udf3($"_1", $"_2", $"_3").alias("udf3"))
    val exploded = df3.select(explode($"udf3").alias("col3"))
    
    exploded.withColumn("first", $"col3".getItem("_1"))
        .withColumn("second", $"col3".getItem("_2"))
        .withColumn("third", $"col3".getItem("_3")).show
    

    虽然直接使用普通的 Scala 代码会更直接。它也可能更有效。如果只有一行,Spark 无论如何也无济于事。

    val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
    val seqExploded = data.flatMap{
        case (a: Seq[Int], b: Seq[Int], c: Seq[Int]) => (a, b, c).zipped.toSeq
    }
    val dfTheSame=sqlContext.createDataFrame(seqExploded)
    dfTheSame.show
    

    【讨论】:

      【解决方案2】:

      应该像这样简单:

      df.withColumn("id", explode(col("id_e")))
        .withColumn("checked", explode(col("checked_e")))
        .withColumn("color", explode(col("color_e")))
      

      【讨论】:

      • 好吧,您的代码似乎与我对 sql 语句所做的一样,但是当我检查它时,我发现这种爆炸并不是我真正需要的。我的初始数据集是每个约 600 行。爆炸后我有大约 1.8 亿条线路。我真正想要的只是逐行取出数组的元素来创建一个新的数据框
      • 你是对的,我的回答产生笛卡尔积,看起来@Rockie Yang 解决方案应该以正确的方式做到这一点
      猜你喜欢
      • 1970-01-01
      • 2017-08-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-01-09
      • 2022-01-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多