【问题标题】:How to populate Map[string,Dataframe] as a column in a Dataframe in scala如何在 Scala 中将 Map[string,Dataframe] 填充为 Dataframe 中的列
【发布时间】:2022-01-02 00:15:16
【问题描述】:

我有一个Map[String, Dataframe]。我想将该 Map 中的所有数据合并到一个 Dataframe 中。数据框可以有 Map 数据类型的列吗?

def sample(dfs : Map[String,Dataframe]): Dataframe =
{
.........
}

例子:

DF1

id name age
1  aaa  23
2  bbb  34

DF2

game  time  score
ludo  10    20
rummy 30    40 

我将上述两个 DF 作为 Map 传递给函数。 然后将每个数据帧的数据以json格式放入输出数据帧的单列中。

外向型

+---------------------------------------------------------------------------------------+
| column1                                                                              |
+---------------------------------------------------------------------------------------+
| [{"id":"1","name":"aaa","age":"23"},{"id":21","name":"bbb","age":"24"}]               |
| [{"game":"ludo","time":"10","score":"20"},{"game":"rummy","time":"30","score":"40"}]  |
+---------------------------------------------------------------------------------------+

【问题讨论】:

  • 你能提供你想要的例子吗? \n 数据帧支持结构和数组类型而不是映射
  • 我想将多个数据帧的数据放到一个数据帧中。如果它不支持映射,那么我如何获取每个输入数据帧的值作为结构列输出数据帧
  • 数据帧应该如何组合?能举个例子吗?
  • 我已经用一个例子编辑了这个问题,请检查

标签: dataframe scala apache-spark dictionary


【解决方案1】:

您要求为每个数据帧生成一行。请注意,如果其中一个数据帧足够大以至于不能包含在单个执行程序中,则此代码将中断。

让我们首先生成数据和映射dfsMap[String, DataFrame]

val df1 = Seq((1, "aaa", 23), (2, "bbb", 34)).toDF("id", "name", "age")
val df2 = Seq(("ludo", 10, 20), ("rummy", 10, 40)).toDF("game", "time", "score")
dfs = Seq(df1, df2)

然后,对于地图的每个数据框,我们生成两列。 big_map 将数据框的每个列名与其值相关联(转换为字符串以具有一致的类型)。 df 只包含数据框的名称。然后,我们将所有数据帧与reduce 合并,并按name 分组(这是每个数据帧完全排成一行的部分,因此一个执行器)。

dfs
    .toSeq
    .map{ case (name, df) => df
        .select(map(
             df.columns.flatMap(c => Seq(lit(c), col(c).cast("string"))) : _*
        ) as "big_map")
        .withColumn("df", lit(name))}
    .reduce(_ union _)
    .groupBy("df")
    .agg(collect_list('big_map) as "column1")
    .show(false)
+---+-----------------------------------------------------------------------------------+
|df |column1                                                                            |
+---+-----------------------------------------------------------------------------------+
|df0|[{id -> 1, name -> aaa, age -> 23}, {id -> 2, name -> bbb, age -> 34}]             |
|df1|[{game -> ludo, time -> 10, score -> 20}, {game -> rummy, time -> 10, score -> 40}]|
+---+-----------------------------------------------------------------------------------+

【讨论】:

    【解决方案2】:

    这是针对您的用例的解决方案:

    import org.apache.spark.sql._
    
    def sample(dfs : Map[String, DataFrame])(implicit spark: SparkSession): DataFrame =
      dfs
        .values
        .foldLeft(spark.emptyDataFrame)((acc, df) => acc.union(df))
    

    需要 spark 会话来创建要折叠的空 DataFrame 累加器。

    或者,如果您可以保证 Map 不为空。

    def sample(dfs : Map[String, DataFrame]): DataFrame =
      dfs
        .values
        .reduce((acc, df) => acc.union(df))
    

    【讨论】:

    • 您能否检查一下我现在添加的示例的问题。我得到的每个输入数据帧都有不同的模式,所以我希望将整个输入数据帧的数据填充为一列,所以我的输出数据框在一列中包含每个输入数据框的数据
    猜你喜欢
    • 2017-02-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-04
    • 1970-01-01
    • 2020-12-31
    • 1970-01-01
    相关资源
    最近更新 更多