【问题标题】:Spark Dataset : data transformationSpark 数据集:数据转换
【发布时间】:2020-06-10 19:12:08
【问题描述】:

我有一个格式为 -

的 Spark 数据集
+--------------+--------+-----+
|name          |type    |cost |
+--------------+--------+-----+
|AAAAAAAAAAAAAA|XXXXX   |0.24|
|AAAAAAAAAAAAAA|YYYYY   |1.14|
|BBBBBBBBBBBBBB|XXXXX   |0.78|
|BBBBBBBBBBBBBB|YYYYY   |2.67|
|BBBBBBBBBBBBBB|ZZZZZ   |0.15|
|CCCCCCCCCCCCCC|XXXXX   |1.86|
|CCCCCCCCCCCCCC|YYYYY   |1.50|
|CCCCCCCCCCCCCC|ZZZZZ   |1.00|
+--------------+--------+----+

我想把它转换成一个类型的对象 -

public class CostPerName {
    private String name;
    private Map<String, Double> costTypeMap;
}

我想要的是,

+--------------+-----------------------------------------------+
|name          |           typeCost.                           |
+--------------+-----------------------------------------------+
|AAAAAAAAAAAAAA|(XXXXX, 0.24), (YYYYY, 1.14)                   |            
|BBBBBBBBBBBBBB|(XXXXX, 0.78), (YYYYY, 2.67), (ZZZZZ, 0.15)    |
|CCCCCCCCCCCCCC|(XXXXX, 1.86), (YYYYY, 1.50), (ZZZZZ, 1.00)    |
+--------------+-----------------------------------------------+

即,对于每个name,我想要一张(type, cost) 的地图。

实现这种转变的有效方法是什么?我可以使用一些数据帧转换吗?我尝试了 groupBy 但这只有在我执行 sum、avg 等聚合查询时才有效。

【问题讨论】:

    标签: apache-spark apache-spark-sql apache-spark-dataset


    【解决方案1】:

    您可以将 type 和 cost 两列组合成一个新的 struct 列,然后按名称分组并使用 collect_list 作为聚合函数:

    df.withColumn("type_cost", struct("type", "cost"))
         .groupBy("name").agg(collect_list("type_cost"))
    

    这将产生一个像这样的数据框:

    +--------------+---------------------------------------------+
    |name          |collect_list(type_cost)                      |
    +--------------+---------------------------------------------+
    |AAAAAAAAAAAAAA|[[XXXXX, 0.24], [YYYYY, 1.14]]               |
    |CCCCCCCCCCCCCC|[[XXXXX, 1.86], [YYYYY, 1.50], [ZZZZZ, 1.00]]|
    |BBBBBBBBBBBBBB|[[XXXXX, 0.78], [YYYYY, 2.67], [ZZZZZ, 0.15]]|
    +--------------+---------------------------------------------+
    

    【讨论】:

    • 谢谢,这很好用。所以,@mazaneicha 的回答是。只是这样它可以帮助我和其他在这里绊倒的人,你能告诉我你是如何深入理解 spark sql 的吗?我之所以问,是因为我确实浏览了 spark 文档,但想不出这种方法。
    • 请注意,使用此解决方案typeCost 会变成列表而不是地图。除此之外,很好的答案!
    • @mazaneicha 你是对的。对于 Spark 版本 >= 2.4,您的答案更接近问题
    • @maddie 尝试在第一步之后运行 printSchema。然后可以看到新列 type_cost 是一个结构体。然后在聚合中收集此结构
    【解决方案2】:

    如果您的 Spark 版本允许,您可以使用 map_from_arrays()

    scala> val df2 = df.groupBy("name").agg(map_from_arrays(collect_list($"type"), collect_list($"cost")).as("typeCost"))
    df2: org.apache.spark.sql.DataFrame = [name: string, typeCost: map<string,decimal(3,2)>]
    
    scala> df2.printSchema()
    root
     |-- name: string (nullable = false)
     |-- typeCost: map (nullable = true)
     |    |-- key: string
     |    |-- value: decimal(3,2) (valueContainsNull = true)
    
    scala> df2.show(false)
    +--------------+---------------------------------------------+
    |name          |typeCost                                     |
    +--------------+---------------------------------------------+
    |AAAAAAAAAAAAAA|[XXXXX -> 0.24, YYYYY -> 1.14]               |
    |CCCCCCCCCCCCCC|[XXXXX -> 1.86, YYYYY -> 1.50, ZZZZZ -> 1.00]|
    |BBBBBBBBBBBBBB|[XXXXX -> 0.78, YYYYY -> 2.67, ZZZZZ -> 0.15]|
    +--------------+---------------------------------------------+
    
    scala>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-14
      • 2016-12-12
      • 2017-09-05
      • 2020-10-14
      • 2021-11-05
      • 2021-08-13
      相关资源
      最近更新 更多