【问题标题】:How to use withColumn to aggregate data from another table?如何使用 withColumn 从另一个表中聚合数据?
【发布时间】:2021-06-04 21:31:36
【问题描述】:

我有一张如下表 -

table_a
id|start_date|end_date
1|2020-01-01|2020-01-05
...

我想使用下表在此数据框中创建一个聚合列

table_b
id|val|date
1|5.0|2020-01-01
1|2.0|2020-01-03
1|6.0|2020-01-08
2|5.0|2020-01-01
3|5.0|2020-01-01

我想要 table_a 中所有 table_b 值总和的聚合列,其中 table_b 的 date 列介于 table_a 的开始日期和结束日期之间。这将如何实现?

所以这个最终的 table_a 将是 -

table_a
id|start_date|end_date|agg_col
1|2020-01-01|2020-01-05|7.0
...

一个后续问题 -

如何根据不同的因素计算多个聚合? (特别是标志和日期范围)我考虑过使用分桶,但来自table_b 的行将落入多个重叠范围。

table_a
id|start_date|end_date
1|2020-01-01|2020-01-05
1|2020-01-03|2020-01-07
1|2020-01-04|2020-01-08
2|2020-01-01|2020-01-05
2|2020-01-03|2020-01-07
2|2020-01-04|2020-01-08
table_b
id|val|date|flag_a|flag_b
1|10.0|2019-12-15|0|0
1|15.0|2019-12-25|0|0
1|5.0|2020-01-01|0|0
1|2.0|2020-01-03|0|4
1|6.0|2020-01-08|3|4
1|4.0|2020-01-18|3|4
2|5.0|2020-01-01|0|3
3|5.0|2020-01-01|0|2

最终输出将具有许多不同的总和聚合列,所有这些列都基于不同的因素。一些可能的聚合可能是 -

table_b date between (START_DATE - 30 days to START_DATE - 15 days)
table_b date between (START_DATE - 180 days to START_DATE - 90 days)
table_b date between (START_DATE - 10 days to START_DATE +5 days AND flag_a=0)
table_b date between (END_DATE + 5 days to END_DATE +20 days AND flag_b=3)

火花函数解决方案会是更快的纯 sql 解决方案,看起来像这样吗?性能是一个巨大的因素,table_a 有 100m 行,table_b 有接近 50b 行。我将运行大约 50 个不同的聚合。

select *, 
    (select sum(val) from table_b b
    where date > a.start_date - 180
    and date < a.start_date - 90
    and a.id = b.id ) as val_sum_180_90,
    ...
from table_a a

这样的解决方案?

【问题讨论】:

  • 500 亿行的表?我希望它以某种方式进行分区。这是您想要的最糟糕的连接情况,因为它是范围连接。知道大小会有所帮助,因为它说最大的痛点是在 table_b 上。因此,最后需要进行预聚合,并且您将切换连接顺序(希望 table_b 与 table_a 连接而不是相反)。
  • 哦等等,其实没那么难(我想)!让我再试一次。
  • @kanielc 是按日期列分区的!
  • 信息量很大,所以如果你按照我在下面建议的使用 faster_b 进行操作,那么生成的表格实际上会非常小。在这种情况下,您可以尝试在连接中翻转 faster_b 和 table_a。

标签: scala apache-spark


【解决方案1】:

首先你必须执行一个连接操作并将它们分组:

import org.apache.spark.sql.functions._

val tableA = spark.read.[...]
val tableB = spark.read.[...]

val df = tableA.as("tableA").join(tableB.as("tableB"), expr("tableA.id = tableB.id AND tableB.date BETWEEN tableA.start_date AND tableA.end_date"), "inner")

val dfGroup = df.groupBy("tableA.id").agg(min("tableA.start_date").as("start_date"), 
                                          max("tableA.end_date").as("end_date"), 
                                          sum("val").as("agg_col"))
dfGroup.show(false)

输出

+---+----------+----------+-------+
|id |start_date|end_date  |agg_col|
+---+----------+----------+-------+
|1  |2020-01-01|2020-01-05|7.0    |
+---+----------+----------+-------+

【讨论】:

  • 是否有日期条件在聚合函数中而不是在连接级别?我也想获得不同范围的聚合,所以 start_date - 90 天到 start_date - 30 天, start_date-30days 到 start_date 等等。并且每个都有不同的列。我可以加入最大范围,因此 start_date - 90 天到 end_date + 90 天,然后在总和级别过滤它吗?
  • @asdasd32 我建议您更新您的问题以显示此场景和所需的输出,以使您的问题更容易可视化
【解决方案2】:

您可以首先根据您的条件(在此特定情况下为 table_b.date BETWEEN table_a.start_date AND table_a.end_date)连接两个表,然后按 table_a.id 分组,如下面的查询中执行所需的聚合。

SELECT table_a.id, FIRST(table_a.start_date) AS start_date, FIRST(table_a.end_date) AS end_date, SUM(table_b.val) AS total
FROM table_a
     LEFT JOIN table_b ON table_b.date BETWEEN table_a.start_date AND table_a.end_date
GROUP BY table_a.id

使用您的示例,结果是:

+---+----------+----------+-----+
| id|start_date|  end_date|total|
+---+----------+----------+-----+
|  1|2020-01-01|2020-01-05| 17.0|
+---+----------+----------+-----+

查询在 Spark SQL 中,但可以轻松转换为 Dataframe API:

tableA.join(tableB, col("date").between(col("start_date"), col("end_date")), "left")
            .groupBy(tableA.col("id"))
            .agg(
                    first("start_date").as("start_date"),
                    first("end_date").as("end_date"),
                    sum("val").as("total")
            ).show();

【讨论】:

  • 在您的 scala 示例中需要在连接期间创建别名,否则您的 groupBy 将引发错误,因为您的数据框将有不明确的列
  • 嗨@Kafels,唯一不明确的列是id,它已经在groupBy() 中被引用为tableA.col("id"),因此即使在连接中没有别名也应该可以正常工作
  • 是的,是的,我没有注意到您在分组操作中选择了tableA.col("id")
【解决方案3】:

根据补充的信息,我想我们现在对情况有了更多的了解。

我会把代码放在下面,然后是解释

val faster_b = table_b.groupBy('id, 'date).agg(sum('val) as "val") // 1
faster_b.join(table_a, Seq("id")) // 2
 .where('date.between('start_date, 'end_date))  // 3 
 .groupBy("id", "start_date", "end_date").agg(sum('val)) // 4

说明:
1 - 预聚合 table_b 以减少连接中的重复次数
2 - 删除 table_a 中没有匹配项的范围,这应该没问题。由于我们在数字列上加入,应该很快。
3 - 加入后进行过滤。这里的权衡是连接时有更多内存,但速度要快得多
4 - 进行最后的聚合

输出

scala> res.show(false)
+---+----------+----------+---+
|id |start_date|end_date  |val|
+---+----------+----------+---+
|1  |2020-01-01|2020-01-05|7.0|
+---+----------+----------+---+

在这里拯救我们的东西实际上是id,因为它很容易加入。我们在这里放弃了更多的内存,但是把 join 变成了一个非常快速的 equi-join。

试一试,告诉我结果如何。实际上可以将其概括为一次处理多个聚合(而不是过滤,创建一个回答聚合问题的列)。如果需要,我可以提供有关该想法的更多信息。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-10-20
    • 1970-01-01
    • 2011-06-17
    • 2013-07-10
    • 2017-07-15
    • 1970-01-01
    • 2016-09-24
    • 2020-06-17
    相关资源
    最近更新 更多