【问题标题】:How to use the functions.explode to flatten element in dataFrame如何使用functions.explode来展平dataFrame中的元素
【发布时间】:2017-10-12 17:28:43
【问题描述】:

我已经编写了这段代码:

case class RawPanda(id: Long, zip: String, pt: String, happy: Boolean, attributes: Array[Double])
case class PandaPlace(name: String, pandas: Array[RawPanda])

object TestSparkDataFrame extends App{

  System.setProperty("hadoop.home.dir", "E:\\Programmation\\Libraries\\hadoop")
  val conf = new SparkConf().setAppName("TestSparkDataFrame").set("spark.driver.memory","4g").setMaster("local[*]")
  val session = SparkSession.builder().config(conf).getOrCreate()

  import session.implicits._

  def createAndPrintSchemaRawPanda(session:SparkSession):DataFrame = {
    val newPanda = RawPanda(1,"M1B 5K7", "giant", true, Array(0.1, 0.1))
    val pandaPlace = PandaPlace("torronto", Array(newPanda))
    val df =session.createDataFrame(Seq(pandaPlace))
    df
  }
  val df2 = createAndPrintSchemaRawPanda(session)
  df2.show

+--------+--------------------+
|    name|              pandas|
+--------+--------------------+
|torronto|[[1,M1B 5K7,giant...|
+--------+--------------------+


  val pandaInfo = df2.explode(df2("pandas")) {
    case Row(pandas: Seq[Row]) =>
      pandas.map{
        case (Row(
          id: Long,
          zip: String,
          pt: String,
          happy: Boolean,
          attrs: Seq[Double])) => RawPanda(id, zip, pt , happy,      attrs.toArray)
      }
  }

  pandaInfo2.show

+--------+--------------------+---+-------+-----+-----+----------+
|    name|              pandas| id|    zip|   pt|happy|attributes|
+--------+--------------------+---+-------+-----+-----+----------+
|torronto|[[1,M1B 5K7,giant...|  1|M1B 5K7|giant| true|[0.1, 0.1]|
+--------+--------------------+---+-------+-----+-----+----------+

我使用的 explode 函数已被弃用,所以我想重新计算 pandaInfo2 数据框,但使用警告中建议的方法。

将 flatMap() 或 select() 与 functions.explode() 一起使用

但是当我这样做时:

 val pandaInfo = df2.select(functions.explode(df("pandas"))

我获得了与 df2 中相同的结果。 我不知道如何继续使用 flatMap 或 functions.explode。

如何使用 flatMap 或 functions.explode 获得我想要的结果?(pandaInfo 中的那个)

我见过this postthis other one,但没有一个能帮到我。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    使用explode 函数调用select 会返回一个DataFrame,其中数组pandas 被“分解”为单独的记录;然后,如果您想“扁平化”每条记录生成的单个“RawPanda”的结构,您可以使用点分隔的“路线”选择各个列:

    val pandaInfo2 = df2.select($"name", explode($"pandas") as "pandas")
      .select($"name", $"pandas",
        $"pandas.id" as "id",
        $"pandas.zip" as "zip",
        $"pandas.pt" as "pt",
        $"pandas.happy" as "happy",
        $"pandas.attributes" as "attributes"
      )
    

    完全相同的操作的不那么冗长的版本是:

    import org.apache.spark.sql.Encoders // going to use this to "encode" case class into schema
    val pandaColumns = Encoders.product[RawPanda].schema.fields.map(_.name)
    
    val pandaInfo3 = df2.select($"name", explode($"pandas") as "pandas")
      .select(Seq($"name", $"pandas") ++ pandaColumns.map(f => $"pandas.$f" as f): _*)
    

    【讨论】:

    • 太好了,什么是编码器,你怎么知道你必须使用它?
    • Encoder 是 Spark 使用反射来找出适合表示给定 class 的 DataFrame schema 的方法。还有其他方法可以从类中推断模式,请参阅stackoverflow.com/q/36746055/5344058 了解更多选项。
    猜你喜欢
    • 2018-09-02
    • 2018-10-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-09-03
    相关资源
    最近更新 更多