【发布时间】:2019-01-23 17:49:11
【问题描述】:
我有一个类似的数据框
| id | date | KPI_1 | ... | KPI_n
| 1 |2012-12-12 | 0.1 | ... | 0.5
| 2 |2012-12-12 | 0.2 | ... | 0.4
| 3 |2012-12-12 | 0.66 | ... | 0.66
| 1 |2012-12-13 | 0.2 | ... | 0.46
| 4 |2012-12-14 | 0.2 | ... | 0.45
| ...
| 55| 2013-03-15 | 0.5 | ... | 0.55
我们有
- X 标识符
- 给定日期的每个标识符对应一行
- n 个 KPI
我必须为每一行计算一些派生的 KPI,而这个 KPI 取决于每个 ID 的先前值。 假设我的派生 KPI 是一个差异,它将是:
| id | date | KPI_1 | ... | KPI_n | KPI_1_diff | KPI_n_diff
| 1 |2012-12-12 | 0.1 | ... | 0.5 | 0.1 | 0.5
| 2 |2012-12-12 | 0.2 | ... | 0.4 | 0.2 |0.4
| 3 |2012-12-12 | 0.66 | ... | 0.66 | 0.66 | 0.66
| 1 |2012-12-13 | 0.2 | ... | 0.46 | 0.2-0.1 | 0.46 - 0.66
| 4 |2012-12-13 | 0.2 | ... | 0.45 ...
| ...
| 55| 2013-03-15 | 0.5 | ... | 0.55
现在:我要做的是:
val groupedDF = myDF.groupBy("id").agg(
collect_list(struct(col("date",col("KPI_1"))).as("wrapped_KPI_1"),
collect_list(struct(col("date",col("KPI_2"))).as("wrapped_KPI_2")
// up until nth KPI
)
我会得到汇总数据,例如:
[("2012-12-12",0.1),("2012-12-12",0.2) ...
然后我会对这些打包的数据进行排序,使用一些 UDF 对这些聚合结果进行解包和映射,然后生成输出(计算差异和其他统计信息)。
另一种方法是使用窗口函数,例如:
val window = Window.partitionBy(col("id")).orderBy(col("date")).rowsBetween(Window.unboundedPreceding,0L)
然后做:
val windowedDF = df.select (
col("id"),
col("date"),
col("KPI_1"),
collect_list(struct(col("date"),col("KPI_1"))).over(window),
collect_list(struct(col("date"),col("KPI_2"))).over(window)
)
这样我得到:
[("2012-12-12",0.1)]
[("2012-12-12",0.1), ("2012-12-13",0.1)]
...
这看起来更好处理,但我怀疑重复窗口会为每个 KPI 产生不必要的分组和排序。
以下是问题:
- 我宁愿采用分组方法?
- 我会去窗户吗?如果是这样,最有效的方法是什么?
【问题讨论】:
-
您应该使用窗口方法,但在选择之前,根据 id 重新分区数据框。这应该有助于减少洗牌操作。 val windowedDF = df.repartition(col("id")).select(...)
-
所以每次评估“窗口”时,我最终可能会导致“重新排序”?所以预先重新分区会产生已经分区和排序的数据,所以“窗口”将是“无操作”?有没有办法“重用”同一个窗口来发出更复杂的结构化数据?
-
是的。我之前在我的代码中尝试过多数投票。它只对数据框进行一次洗牌,并将其用于所有窗口函数。
-
好的,非常感谢,我认为您应该“回答”这个问题,以便更明显,我可以投票
标签: apache-spark apache-spark-sql