【问题标题】:Spark collect limited sorted listSpark收集有限排序列表
【发布时间】:2020-05-11 21:12:51
【问题描述】:

我正在尝试使用 spark 为数据帧创建一个有限的排序列表,但是我想不出一种快速且低内存的方法。

我的数据框由三列、两个键 ID 和一个距离列组成,我想获取靠近每个 ID 的前 n=50 个 ID 的列表。我尝试 groupBy 然后是 collect_list 然后是 sort_array,然后是 UDF 以仅获取 ID,最后通过 UDF 传递它以获取第一个 n=50,但它非常慢,有时会出现内存错误。

# Sample Data
val dataFrameTest = Seq(
      ("key1", "key2", 1),
      ("key1","key3", 2),
      ("key1", "key5" ,4),
      ("key1", "key6" ,5),
      ("key1","key8" ,6),
      ("key2", "key7" ,3),
      ("key2", "key9" ,4),
      ("key2","key5" ,5)
      ).toDF("id1", "id2", "distance")

如果限制是 2 想要

"key1" | ["key2", "key3"]    
"key2" | ["key7", "key8"]

当前方法:

sorted_df = dataFrameTest.groupBy("key1").agg(collect_list(struct("distance", "id2")).alias("toBeSortedCol")).
withColumn("sortedList", sort_array("toBeSortedCol"))

我的数据非常大,所以 spark 是唯一的解决方案。感谢任何帮助/指导。

【问题讨论】:

  • 你的是一个图数据,也许你可以利用 Spark-GraphX 来处理它。如果 key 的唯一数量(key1 和 key2 )都不太高,可以考虑将数据集转换为 Distance-Matrix,然后进行处理,en.wikipedia.org/wiki/Distance_matrix

标签: scala apache-spark memory aggregation collect


【解决方案1】:

为此使用 Spark SQL 窗口函数之一怎么样?类似的东西

scala> val dataFrameTest = Seq(
     |       ("key1", "key2", 1),
     |       ("key1","key3", 2),
     |       ("key1", "key5" ,4),
     |       ("key1", "key6" ,5),
     |       ("key1","key8" ,6),
     |       ("key2", "key7" ,3),
     |       ("key2", "key9" ,4),
     |       ("key2","key5" ,5)
     |       ).toDF("id1", "id2", "distance")
dataFrameTest: org.apache.spark.sql.DataFrame = [id1: string, id2: string ... 1 more field]

scala> dataFrameTest.createOrReplaceTempView("sampledata")

scala> spark.sql("""
     | select t.id1, collect_list(t.id2) from (
     | select id1, id2, row_number() over (partition by id1 order by distance) as rownum from sampledata
     | )t
     | where t.rownum < 3 group by t.id1
     | """).show(false)
+----+-----------------+
|id1 |collect_list(id2)|
+----+-----------------+
|key1|[key2, key3]     |
|key2|[key7, key9]     |
+----+-----------------+

scala>

只需将row_number() 替换为rank()dense_rank(),具体取决于您需要的结果类型。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-12-09
    • 1970-01-01
    • 2021-11-11
    • 1970-01-01
    • 1970-01-01
    • 2016-06-23
    • 2018-04-21
    • 2011-01-08
    相关资源
    最近更新 更多