【发布时间】:2019-02-04 15:53:10
【问题描述】:
给定一个包含字段的数据源:product_id - product - start_time - end_time
我正在尝试使用 Dataframe 函数为相同的product(基于start_time 和end_time)构建捕获重叠记录的逻辑。
------------------------------------------------
| product_id | product | start_time | end_time |
------------------------------------------------
| 1 | bottle | 2 | 4 |
| 2 | bottle | 3 | 5 |
| 3 | bottle | 2 | 3 |
| 4 | bottle | 6 | 7 |
| 1 | can | 2 | 4 |
| 2 | can | 5 | 6 |
| 3 | can | 2 | 4 |
我想在输出中接收
-------------------------------------------------------------------------------------------------
| product_id_a | product_id_b | product | start_time_a | end_time_a | start_time_b | end_time_b |
-------------------------------------------------------------------------------------------------
| 1 | 2 | bottle | 2 | 4 | 3 | 5 |
| 1 | 3 | bottle | 2 | 4 | 2 | 3 |
因为bottle_1与bottle_2和bottle_3有重叠时间,如果满足以下条件,则有两条记录重叠:
max(a.start_time, b.start_time) < min(a.end_time, b.end_time)!(a.start_time == b.start_time && a.end_time == b.end_time)a.start_time != b.start_time || a.end_time != b.end_time
最后两个条件只是指定我对start_time 和end_time 相等的情况不感兴趣(例如can_1 和can_3 不在预期结果中,即使它们具有相同的@987654341 @ 和 end_time)。
问题的结构很容易想到使用 RDD 的 MapReduce 解决方案,但我对 Dataframes 的解决方案感兴趣。
提示:有没有可能用groupBy().agg() 指定一个有趣的条件来达到所描述的逻辑?
如有任何进一步的解释,请随时询问
不重复,属于How to aggregate over rolling time window with groups in Spark
不幸的是,在报告的答案中使用了F.lag,在我的情况下,这不是一个足够好的条件:F.lag 仅使用与以前记录的比较,但在报告的示例中无法按预期工作,因为那bottle_1 不会被报告为与 bottle_3 重叠,因为它们不是连续的记录
【问题讨论】:
标签: python apache-spark pyspark apache-spark-sql