【问题标题】:How to remove duplicates from a spark data frame while retaining the latest?如何在保留最新数据的同时从 spark 数据框中删除重复项?
【发布时间】:2019-09-03 17:14:01
【问题描述】:

我正在使用 spark 从 Amazon S3 加载 json 文件。我想根据保留最新的数据框的两列删除重复项(我有时间戳列)。最好的方法是什么?请注意,重复项可能分布在多个分区中。我可以在不改组的情况下删除保留最后一条记录的重复项吗?我正在处理 1 TB 的数据。

我正在考虑通过这两个列对数据帧进行分区,这样所有重复的记录都将“一致地散列”到同一个分区中,因此分区级别排序之后是删除重复项将消除所有重复项,只保留一个。我不知道这是否可能。任何信息表示赞赏。

【问题讨论】:

标签: pyspark apache-spark-sql


【解决方案1】:

使用 row_number() 窗口函数可能更容易完成您的任务,c1 下面是时间戳列,c2c3 是用于对数据进行分区的列:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# set rn with F.row_number() and filter the result by rn == 1
df_new = df.withColumn('rn', F.row_number().over(win)).where('rn = 1').drop('rn')
df_new.show()

编辑:

如果您只需要重复项并删除唯一行,则添加另一个字段:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# window to cover all rows in the same partition
win2 = Window.partitionBy('c2', 'c3') \
             .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# set new columns: rn, cnt and filter the result by rn == 1 and cnt > 1
df_new = df.withColumn('rn', F.row_number().over(win)) \
           .withColumn('cnt', F.count('c1').over(win2)) \
           .where('rn = 1 and cnt > 1') \
           .drop('rn', 'cnt')
df_new.show()

【讨论】:

  • 我删除了我的答案,因为没有加入这个答案会更有效率!
  • 这个怎么样? ` window = Window.partitionBy(partition_columns).orderBy(F.desc(sort_key))` Window.partitionBy(partition_columns).orderBy(F.asc(sort_key)) data_frame = data_frame.withColumn('rank', F.rank().over(window))` .withColumn('row_number', F.row_number().over(window))` .filter((F.col('rank') == 1) & (F.col('row_number') == 1)).drop('rank', 'row_number')跨度>
  • @lalatnayak 您是否对 rank() 和 row_number() 使用相同的窗口规范?我认为它们在默认窗口范围内返回相同的值。
  • 感谢您的指出。实际上,我认为排名对我来说就足够了。快速提问。过滤器和联合(使用不同方案的数据框)对分区有什么影响。
  • @lalatnayak,不知道为什么要将数据框与不同的方案联合起来。你的意思是join?我不认为filterunion 对分区有直接影响。这真的取决于方法链。即 union(..) 后跟 distinct()。你可以试试explain(),看看你的转换是否涉及分区或洗牌。我不是这方面的专家,所以上面只是我的 2 美分。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-11-30
  • 2016-10-01
  • 1970-01-01
  • 1970-01-01
  • 2019-12-17
  • 2012-11-25
  • 2021-09-12
相关资源
最近更新 更多