【问题标题】:Merge dataframes and pick the latest record合并数据框并选择最新记录
【发布时间】:2018-10-26 11:27:14
【问题描述】:

我有 2 个数据框。

df1:

|Timestamp                        |ProjectId|AusID|Version|
+---------------------------------+---------+-------------+
|2017-09-19 16:57:36.000642 +02:00|20034    |529  |2017   |
|2017-09-19 16:58:32.000642 +02:00|20035    |973  |2017   |
|2017-09-21 12:51:36.000642 +02:00|20034    |521  |2017   |
|2017-09-22 17:58:36.000642 +02:00|20035    |543  |2017   |

df2:

|Timestamp                        |ProjectId|AusID|Version|
+---------------------------------+---------+-------------+
|2017-09-20 08:46:17.465000 Z     |20034    |513  |2017   |
|2017-09-20 08:46:17.465000 Z     |20035    |973  |2017   |
|2017-09-21 08:46:17.465000 Z     |20034    |521  |2017   |
|2017-09-22 08:46:17.465000 Z     |20035    |587  |2017   |

这些记录数以百万计,列数不多。我想合并两个数据框并使用 AusID 删除重复项,即当两条记录具有相同的 AusID 时,选择最新的一条(根据日期)并删除另一条。另一个问题是,两个数据框中的日期格式也不同。

我尝试使用以下方法:

df1.union(df2).except(df1.intersect(df2)).show()  

但它似乎正在考虑所有列。如果有人能给出一些提示,那就太好了。

【问题讨论】:

  • 合并两个数据框并使用窗口函数分组选择最新的时间戳。
  • @ShankarKoirala 数据大小为几个 100 GB。合并所有记录是个好主意吗?
  • @WaqarAhmed 联合操作不会对数据进行洗牌。所以,我们不必担心性能。
  • 好的。我将研究窗口函数和联合。谢谢。
  • @himanshuIIITian 做窗口功能还是做洗牌?

标签: apache-spark join merge apache-spark-sql databricks


【解决方案1】:

您可以考虑以下方法:

result = df1.unionAll(df2)
import org.apache.spark.sql.expressions._

val windowSpec = Window.partitionBy("ProjectId","AusID","Version").orderBy(col("Timestamp").asc)
val latestForEachKey = result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank")
latestForEachKey.show(false)

【讨论】:

  • 谢谢。我们必须按除日期以外的所有列进行分区吗?
  • 是的。我们必须按除日期以外的所有列进行分区。
  • 我能问一下这背后的直觉吗?
  • 上面的任何操作都做shuffle吗?
  • 没有..没有洗牌
猜你喜欢
  • 1970-01-01
  • 2016-12-08
  • 2019-06-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-11-29
  • 2016-11-07
  • 1970-01-01
相关资源
最近更新 更多