【问题标题】:Spark: get groupBy output as list of rowsSpark:将groupBy输出作为行列表
【发布时间】:2019-12-03 14:28:01
【问题描述】:

在 spark 中使用 group by 时可以得到Dataset<List<Row>>

作为输出。这里 Row 是原始行。

Dataset<<List<Row>> output = dataset.groupBy("key");

如果使用聚合并且collect_list then 在输出行中,则无法保证列表格式的值是有序的。因此,在我的情况下,这不是一个好的解决方案。

例如:聚合输出。但是不能保证设定值的顺序。

+-----+----------------------------+
|item1|set                         |
+-----+----------------------------+
|1    |[[5,3], [4,1], [3,2], [2,2]]|
|2    |[[4,1], [1,2], [5,2], [3,1]]|
+-----+----------------------------+ 

请告知是否有办法在不使用 pojos 的情况下获得 Dataset&lt;List&lt;Row&gt;&gt; 的输出。 (例如基于 pojo 的解决方案:FlatMapGroupsWIthStateFunction

【问题讨论】:

  • 我什至会说这是真的“不能保证有序” 对于任何分布式大型数据集问题。您希望对聚合值应用什么顺序?为什么不使用任何sort* 标准函数?

标签: java apache-spark spark-streaming spark-structured-streaming


【解决方案1】:

我已经开始为您提供解决方案。您可以使用monotonically_increasing_id 创建索引并“记住”数据框的顺序。然后,您可以按键分组,将结果与collect_list 聚合,按索引对列表进行排序,最后将其删除。

SparkSQL 中有一个sort_array 函数可以对数组进行排序。不幸的是,我不知道 sparkSQL 数组上有任何等效的 map 函数来删除索引。这就是我提出基于 UDF 的解决方案的原因:

// the UDF that sorts by the index "i" and keeps the value
val sort_and_strip = udf{ (x : WrappedArray[Row]) =>
    x.sortBy(_.getAs[Long]("i"))
     .map(_.getAs[Long]("value"))
}

// an example of use:
spark.range(7)
    .select('id % 3 as "key", 'id as "value")
    .withColumn("i", monotonically_increasing_id)
    .groupBy("key")
    .agg(collect_list(struct('i, 'value)) as "list")
    .withColumn("list", sort_and_strip('list))
    .show(false)
+---+---------+
|key|list     |
+---+---------+
|0  |[0, 3, 6]|
|1  |[1, 4]   |
|2  |[2, 5]   |
+---+---------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-10-31
    • 1970-01-01
    • 2015-06-04
    • 2021-12-02
    • 2016-12-06
    • 1970-01-01
    • 1970-01-01
    • 2021-11-14
    相关资源
    最近更新 更多