【问题标题】:confused with spark groupBy与火花组混淆
【发布时间】:2018-08-27 17:11:46
【问题描述】:

我是 scala 和 spark 的新手,我想在 hive 表中过滤一些重复的记录,我选择了 spark。

我对kotlin很熟悉,所以我用kotlin来描述逻辑:

data class Bean(val id: Int, val name: String, val time: String)

val data = listOf(
        Bean(1, "1", "20180101 00:00:00"),
        Bean(1, "2", "20180101 00:00:01"),
        Bean(2, "3", "20180101 00:00:02"),
        Bean(2, "4", "20180101 00:00:03"),
        Bean(3, "5", "20180101 00:00:04")
)

val result = data.groupBy { it.id }.map { (_, v) -> v.maxBy { it.time } }

result.forEach { println(it)}

//result

Bean(id=1, name=2, time=20180101 00:00:01)
Bean(id=2, name=4, time=20180101 00:00:03)
Bean(id=3, name=5, time=20180101 00:00:04)

hive中有很多不同列的表,但它们都有'id'&'time',记录可能有相同的id但不同的时间,我只需要相同id记录的最大时间。

spark.read.table(s"$dbName.$tableName")
    .groupBy($"id") // get a RelationalGroupedDataset
    ...
}

当我使用 groupBy 时,我得到一个 RelationalGroupedDataset 结果,我想知道下一步该怎么做?或者可能是错误的步骤,我该怎么办?

【问题讨论】:

  • 只在 groupBy 之后使用 .agg(max($"time")) ,你需要导入 spark functions.max
  • @Ramesh Maharjan,在 groupBy 之后使用 .agg(max($"time")),然后数据框只有 2 个 columus- id & max("time"),我需要完整的记录,就像我的 kotlin 示例
  • 你应该使用窗口函数

标签: apache-spark


【解决方案1】:

你有几个选择。

1)。制作一个结构体,最大化它,然后解压它。

结构体像元组一样被比较为最大函数,所以时间必须作为第一个字段。

spark.read.table(s"$dbName.$tableName")
    .withColumn("v", struct($"time", $"name"))
    .groupBy($"id").agg(max($"v"))
    .select($"id", $"v.name", $"v.time")

2)。使用窗口函数并找到每个条目的第一行。

我忘记了确切的语法,但它类似于以下内容。

val w = Window.partitionBy($"id").orderBy($"time".desc)
spark.read.table(s"$dbName.$tableName")
    .withColumn($"num", row_number().over(w))
    .filter($"num" === 1)

【讨论】:

    猜你喜欢
    • 2017-01-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-11-08
    • 2017-08-04
    • 2016-03-03
    • 1970-01-01
    • 2014-07-09
    相关资源
    最近更新 更多