【发布时间】:2019-02-03 08:43:48
【问题描述】:
好的,我有一个包含列定义和相应序号位置的表。我正在使用 Spark (scala) 构建一个元数据驱动的 ETL 框架。我有一个包含以下信息的表:
- 表名
- 列名
- 数据类型
- 顺序位置
我必须从该数据构建一个 CREATE TABLE 语句。没什么大不了的,对吧?我尝试了看似标准的答案:
var metadatadef = spark.sql("SELECT tablename, columnname, datatype, ordinalposition FROM metadata")
.withColumn("columndef", concat($"columnname", lit(" "), $"datatype"))
.sort($"tablename", $"ordinalposition")
.groupBy("tablename")
.agg(concat_ws(", ", collect_list($"columndef")).as("columndefs"))
但是 sort() 调用似乎在这里被忽略了。或者在 collect_list() 和 concat_ws() 之间它被重新洗牌。给定这样的源数据:
+-----------+--------------+---------------+-----------------+
| tablename | columnname | datatype | ordinalposition |
+ ----------+--------------+---------------+-----------------+
| table1 | IntColumn | int | 0 |
| table2 | StringColumn | string | 2 |
| table1 | StringColumn | string | 2 |
| table2 | IntColumn | int | 0 |
| table1 | DecColumn | decimal(15,2) | 1 |
| table2 | DecColumn | decimal(15,2) | 1 |
+-----------+--------------+---------------+-----------------+
我试图得到这样的输出:
+-----------+----------------------------------------------------------------+
| tablename | columndefs |
+-----------+----------------------------------------------------------------+
| table1 | IntColumn int, DecColumn decimal(15,2), StringColumn string |
| table2 | IntColumn int, DecColumn decimal(15,2), StringColumn string |
+-----------+----------------------------------------------------------------+
相反,我得到了这样的结果:
+-----------+----------------------------------------------------------------+
| tablename | columndefs |
+-----------+----------------------------------------------------------------+
| table1 | IntColumn int, StringColumn string, DecColumn decimal(15,2) |
| table2 | StringColumn string, IntColumn int, DecColumn decimal(15,2) |
+-----------+----------------------------------------------------------------+
我是否需要构建 UDF 以确保获得正确的顺序?出于比较目的,我需要将输出放入数据框中,而不仅仅是构建 CREATE TABLE 语句。
【问题讨论】:
-
如果您想要自定义订单,可以使用 udf,但如果只需要相同的订单,您可以尝试
.agg(concat_ws(", ", sort_array(collect_list($"columndef"))).as("columndefs"))
标签: scala apache-spark