【问题标题】:Collect rows as list with group by apache spark通过apache spark将行收集为列表
【发布时间】:2018-12-09 23:54:29
【问题描述】:

我有一个特殊的用例,我有多个行用于同一客户,每个行对象看起来像:

root
 -c1: BigInt
 -c2: String
 -c3: Double
 -c4: Double
 -c5: Map[String, Int]

现在我已经按列 c1 进行分组,并将所有行收集为同一客户的列表,例如:

c1, [Row1, Row3, Row4]
c2, [Row2, Row5]

我试过这样做 dataset.withColumn("combined", array("c1","c2","c3","c4","c5")).groupBy("c1").agg(collect_list("combined")) 但我得到一个例外:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'array(`c1`, `c2`, `c3`, `c4`, `c5`)' due to data type mismatch: input to function array should all be the same type, but it's [bigint, string, double, double, map<string,map<string,double>>];;

【问题讨论】:

  • 你是在建议元组吗?你能帮我举个例子吗。我也只想将它们收集为 Rows 对象。
  • 这是我的错。请粘贴您的测试数据。会检查

标签: java scala apache-spark apache-spark-sql spark-streaming


【解决方案1】:

如果您希望结果由 Rows 的集合组成,请考虑按如下方式转换为 RDD:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

def df = Seq(
    (BigInt(10), "x", 1.0, 2.0, Map("a"->1, "b"->2)),
    (BigInt(10), "y", 3.0, 4.0, Map("c"->3)),
    (BigInt(20), "z", 5.0, 6.0, Map("d"->4, "e"->5))
  ).
  toDF("c1", "c2", "c3", "c4", "c5").
  // as[(BigInt, String, Double, Double, Map[String, Int])]

df.rdd.map(r => (r.getDecimal(0), r)).groupByKey.collect
// res1: Array[(java.math.BigDecimal, Iterable[org.apache.spark.sql.Row])] = Array(
//   (10,CompactBuffer([10,x,1.0,2.0,Map(a -> 1, b -> 2)], [10,y,3.0,4.0,Map(c -> 3)])),
//   (20,CompactBuffer([20,z,5.0,6.0,Map(d -> 4, e -> 5)]))
// )

或者,如果您对 DataFrame 中 struct-type 行的集合很满意,这里有另一种方法:

val cols = ds.columns

df.groupBy("c1").agg(collect_list(struct(cols.head, cols.tail: _*)).as("row_list")).
  show(false)
// +---+----------------------------------------------------------------+
// |c1 |row_list                                                        |
// +---+----------------------------------------------------------------+
// |20 |[[20,z,5.0,6.0,Map(d -> 4, e -> 5)]]                            |
// |10 |[[10,x,1.0,2.0,Map(a -> 1, b -> 2)], [10,y,3.0,4.0,Map(c -> 3)]]|
// +---+----------------------------------------------------------------+

【讨论】:

    【解决方案2】:

    您可以使用struct 函数来组合列并使用groupBycollect_list 聚合函数来代替array

    import org.apache.spark.sql.functions._
    df.withColumn("combined", struct("c1","c2","c3","c4","c5"))
        .groupBy("c1").agg(collect_list("combined").as("combined_list"))
        .show(false)
    

    这样您就有了 分组数据集,其中 schema

    root
     |-- c1: integer (nullable = false)
     |-- combined_list: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- c1: integer (nullable = false)
     |    |    |-- c2: string (nullable = true)
     |    |    |-- c3: string (nullable = true)
     |    |    |-- c4: string (nullable = true)
     |    |    |-- c5: map (nullable = true)
     |    |    |    |-- key: string
     |    |    |    |-- value: integer (valueContainsNull = false)
    

    希望回答对你有帮助

    【讨论】:

      猜你喜欢
      • 2020-05-26
      • 2016-05-30
      • 2020-05-11
      • 2021-09-17
      • 2021-11-11
      • 1970-01-01
      • 1970-01-01
      • 2021-02-23
      • 2021-03-06
      相关资源
      最近更新 更多