【发布时间】:2018-03-08 20:40:55
【问题描述】:
我有一个如下所示的数据集(“guid”、“timestamp”、“agt”)
val df = List(Test("a", "1", null),
Test("b", "2", "4"),
Test("a", "1", "3"),
Test("b", "2", "4"),
Test("c", "1", "3"),
Test("a", "6", "8"),
Test("b", "2", "4"),
Test("a", "1", "4")
我需要计算
- 按 guid 分组时每行的最小时间戳。
- 按(guid、时间戳)分组时每个键的计数
- 行的 agtM 按 guid 分组并按时间戳 (desc) 排序然后取第一个非空 agt else ""
- 删除重复项
所以输出将如下所示。
+----+---------+---+-------+-----+----+
|guid|timestamp|agt|minimum|count|agtM|
+----+---------+---+-------+-----+----+
| c| 1| 3| 1| 1| 3|
| b| 2| 4| 2| 3| 4|
| a| 1| | 1| 3| 8|
| a| 6| 8| 1| 1| 8|
+----+---------+---+-------+-----+----+
我试过了
val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val gg = df.toDS()
.withColumn("minimum", min("timestamp").over(w))
.withColumn("count", count("*").over(w1))
.withColumn("agtM", coalesce(first($"agt", true).over(w2), lit("")))
.dropDuplicates("guid", "timestamp")
不过,我对 agtM 计算不是很自信。我的目标是实现最小的改组,因为在这种情况下,我们首先按 guid 分组,然后按(guid,时间戳)分组,从逻辑上讲,第二个分组应该发生在第一个创建的分区中。然后输出按 guid 分组并与另一个表连接。这两个数据都非常庞大(以 TB 为单位),因此希望通过最少的改组来实现这一点,并且不想稍后在 mapGroups 中移动计算(我可以简单地通过使用非空代理时间和 maxBy 过滤组来完成 agtM 计算时间戳)。您能否建议实现上述目标的最佳方法?
编辑
agtM 计算已修复。为了给前面的操作提供更多的上下文,输出和另一个数据集(一个额外的字段,我们在输出中保持虚拟)的联合将需要按键分组以产生最终结果。我也在考虑在每个分区(mapPartitions)内计算这些值(除了窗口w),然后将每个分区内的列表作为另一个列表并进行进一步计算。
【问题讨论】:
-
您的
w2窗口规范似乎没有做任何与您列出的agtM要求相关的事情,这应该用第一个非空agt在降序中前向填充agtM时间戳顺序。但是,您预期的8对应于""的输出似乎表明您实际上想用最后一个非空agt回填? -
这行
| a| 1| | 1| 3| 8|不应该是| a| 1| 3| 1| 2| 4|吗? -
@LeoCyou 是对的。我想在遍历列表以获取按时间戳排序的 guid 时回填最后一个非空 agt。另一种计算方法是
df.toDS().filter(_.agt != "").groupByKey(r => r.guid).mapGroups((a, b) => { val agtMObject = b.maxBy(p => p.timestamp) TestWithagtM(agtMObject.guid, agtMObject.timestamp, agtMObject.agt, agtMObject.agtM) })@RameshMaharjan(a, 1, "") 是输入,最后一列应该是 8,因为如果你 groupBy a 然后按时间戳 desc 排序,那么 8 是对应的 agt 6将成为agtM
标签: scala apache-spark apache-spark-sql apache-spark-dataset