【问题标题】:Combine multiple columns into single column in SPARK在 SPARK 中将多列合并为单列
【发布时间】:2021-01-12 01:36:30
【问题描述】:

我的 parquet 文件中有以下格式的扁平化传入数据:

我想将其转换为以下格式,我不扁平化我的结构:

我尝试了以下方法:

Dataset<Row> rows = df.select(col("id"), col("country_cd"),
                explode(array("fullname_1", "fullname_2")).as("fullname"),
                explode(array("firstname_1", "firstname_2")).as("firstname"));

但它给出了以下错误:

线程 "main" org.apache.spark.sql.AnalysisException 中的异常:每个选择子句只允许一个生成器,但找到了 2 个:explode(array(fullname_1, fullname_2)), explode(array(firstname_1, firstname_2) ));

我理解这是因为您不能在查询中使用超过 1 个爆炸。 我正在寻找在 Spark Java 中执行上述操作的选项。

【问题讨论】:

    标签: apache-spark apache-spark-sql databricks


    【解决方案1】:

    使用.flatMap() 最容易解决此类问题。 .flatMap().map() 类似,只是它允许您为每个输入记录输出 n 条记录,而不是 1:1 的比率。

    val df = Seq(
        (1, "USA", "Lee M", "Lee", "Dan A White", "Dan"),
        (2, "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
        ).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")
    
    df.flatMap(row => {
        val id = row.getAs[Int]("id")
        val cc = row.getAs[String]("country_code")
        Seq(
            (id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1")),
            (id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1"))
        )
    }).toDF("id", "country_code", "fullname", "firstname").show()
    

    这会导致以下结果:

    +---+------------+-----------+---------+
    | id|country_code|   fullname|firstname|
    +---+------------+-----------+---------+
    |  1|         USA|      Lee M|      Lee|
    |  1|         USA|      Lee M|      Lee|
    |  2|         CAN|Pate Poland|     Pate|
    |  2|         CAN|Pate Poland|     Pate|
    +---+------------+-----------+---------+
    

    【讨论】:

    • 这个解决方案在 scala 中,但应该很容易移植。
    【解决方案2】:

    作为一个数据库人,我喜欢对这样的事情使用基于集合的操作,例如union

    val df = Seq(
      ("1", "USA", "Lee M", "Lee", "Dan A White", "Dan"),
      ("2", "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
    ).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")
    
    
    val df_new = df
      .select("id", "country_code", "fullname_1", "firstname_1").union(df.select("id", "country_code", "fullname_2", "firstname_2"))
      .orderBy("id")
    
    df_new.show
    df.createOrReplaceTempView("tmp")
    

    或等效的 SQL:

    %sql
    SELECT id, country_code, fullname_1 AS fullname, firstname_1 AS firstname
    FROM tmp
    UNION
    SELECT id, country_code, fullname_2, firstname_2
    FROM tmp
    

    我的结果:

    我想与 flatMap 技术相比的一个优点是您不必指定数据类型,而且从表面上看它看起来更简单。当然,这取决于你。

    【讨论】:

      【解决方案3】:

      您需要将名字和姓氏包装到一个结构数组中,然后再分解:

      Dataset<Row> rows = df.select(col("id"), col("country_cd"),
        explode(
          array(
            struct(
              col("firstname_1").as("firstname"), col("fullname_1").as("fullname")),
            struct(
              col("firstname_2").as("firstname"), col("fullname_2").as("fullname"))
          )
        )
      )
      

      通过这种方式,您将获得快速的窄转换,具有 Scala/Python/R 的可移植性,并且它应该比 df.flatMap 解决方案运行得更快,后者会将 Dataframe 转换为 RDD,而查询优化器无法改进。由于从不安全的字节数组复制到 java 对象,Java Garbage Collector 可能会有额外的压力。

      【讨论】:

        猜你喜欢
        • 2019-01-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-05-01
        相关资源
        最近更新 更多