【问题标题】:Spark Scala - concatenate column in rows in a certain orderSpark Scala - 以特定顺序连接行中的列
【发布时间】:2019-02-03 08:43:48
【问题描述】:

好的,我有一个包含列定义和相应序号位置的表。我正在使用 Spark (scala) 构建一个元数据驱动的 ETL 框架。我有一个包含以下信息的表:

  • 表名
  • 列名
  • 数据类型
  • 顺序位置

我必须从该数据构建一个 CREATE TABLE 语句。没什么大不了的,对吧?我尝试了看似标准的答案:

var metadatadef = spark.sql("SELECT tablename, columnname, datatype, ordinalposition FROM metadata")
    .withColumn("columndef", concat($"columnname", lit(" "), $"datatype"))
    .sort($"tablename", $"ordinalposition")
    .groupBy("tablename")
    .agg(concat_ws(", ", collect_list($"columndef")).as("columndefs"))

但是 sort() 调用似乎在这里被忽略了。或者在 collect_list() 和 concat_ws() 之间它被重新洗牌。给定这样的源数据:

+-----------+--------------+---------------+-----------------+
| tablename | columnname   | datatype      | ordinalposition |
+ ----------+--------------+---------------+-----------------+
| table1    | IntColumn    | int           | 0               |
| table2    | StringColumn | string        | 2               |
| table1    | StringColumn | string        | 2               |
| table2    | IntColumn    | int           | 0               |
| table1    | DecColumn    | decimal(15,2) | 1               |
| table2    | DecColumn    | decimal(15,2) | 1               |
+-----------+--------------+---------------+-----------------+

我试图得到这样的输出:

+-----------+----------------------------------------------------------------+
| tablename | columndefs                                                     |
+-----------+----------------------------------------------------------------+
| table1    | IntColumn int, DecColumn decimal(15,2), StringColumn string    |
| table2    | IntColumn int, DecColumn decimal(15,2), StringColumn string    |
+-----------+----------------------------------------------------------------+

相反,我得到了这样的结果:

+-----------+----------------------------------------------------------------+
| tablename | columndefs                                                     |
+-----------+----------------------------------------------------------------+
| table1    | IntColumn int, StringColumn string, DecColumn decimal(15,2)    |
| table2    | StringColumn string, IntColumn int, DecColumn decimal(15,2)    |
+-----------+----------------------------------------------------------------+

我是否需要构建 UDF 以确保获得正确的顺序?出于比较目的,我需要将输出放入数据框中,而不仅仅是构建 CREATE TABLE 语句。

【问题讨论】:

  • 如果您想要自定义订单,可以使用 udf,但如果只需要相同的订单,您可以尝试 .agg(concat_ws(", ", sort_array(collect_list($"columndef"))).as("columndefs"))

标签: scala apache-spark


【解决方案1】:

您可以创建一个 (ordinalposition, columndef) 的struct 列,并应用sort_arraygroupBy 转换期间按所需顺序对聚合的columndef 进行排序,如下所示:

import org.apache.spark.sql.functions._

val df = Seq(
  ("table1", "IntColumn", "int", "0"),
  ("table2", "StringColumn", "string", "2"),
  ("table1", "StringColumn", "string", "2"),
  ("table2", "IntColumn", "int", "0"),
  ("table1", "DecColumn", "decimal(15,2)", "1"),
  ("table2", "DecColumn", "decimal(15,2)", "1")
).toDF("tablename", "columnname", "datatype", "ordinalposition")

df.
  withColumn("columndef",
    struct($"ordinalposition", concat($"columnname", lit(" "), $"datatype").as("cdef"))
  ).
  groupBy("tablename").agg(sort_array(collect_list($"columndef")).as("sortedlist")).
  withColumn("columndefs", concat_ws(", ", $"sortedlist.cdef")).
  drop("sortedlist").
  show(false)
// +---------+-----------------------------------------------------------+
// |tablename|columndefs                                                 |
// +---------+-----------------------------------------------------------+
// |table2   |IntColumn int, DecColumn decimal(15,2), StringColumn string|
// |table1   |IntColumn int, DecColumn decimal(15,2), StringColumn string|
// +---------+-----------------------------------------------------------+

【讨论】:

  • 耶!有效!感谢有关 sort_array 和结构的建议。
猜你喜欢
  • 1970-01-01
  • 2015-08-29
  • 1970-01-01
  • 2017-04-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多