【问题标题】:pyspark remove just consecutive duplicated rowspyspark 只删除连续的重复行
【发布时间】:2021-07-04 21:26:24
【问题描述】:

经过batch_date 的一些迭代和连接后,我有了一个数据框。让我们关注 r_id0==0r_id0==1

+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date|      from|     until|                desc|                hash|
+-----+----------+----------+----------+--------------------+--------------------+
|    0|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 1|b10192333e9c61a40...|
|    0|2020-04-30|2020-04-30|9999-12-31|  Esto es un campo 1|b10192333e9c61a40...|
|    1|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    1|2020-02-28|2020-02-28|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    1|2020-03-31|2020-03-31|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    2|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 2|b31c138a4b6a96169...|
|    2|2020-02-28|2020-02-28|2020-03-30|  Esto es un campo 4|fa1cfe1eaf9b14f88...|
|    2|2020-03-31|2020-03-31|2020-04-29|  Esto es un campo 3|e4ffd0cd4a6cf1193...|
|    2|2020-04-30|2020-04-30|9999-12-31| Esto es un campo 23|0fdb24b8fcf8603ee...|
|    3|2020-02-28|2020-02-28|2020-03-30|  Esto es un campo 3|a3a6870ca9b42ad06...|
|    3|2020-03-31|2020-03-31|2020-04-29|  Esto es un campo 2|18b24f88271e99618...|
|    4|2020-03-31|2020-03-31|2020-04-29|  Esto es un campo 5|2fe50db0156cfc909...|
|    6|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 6|7c6d329b73d9de59f...|
|    6|2020-04-30|2020-04-30|9999-12-31|Esto es un campo 77|d70c9340f83167e95...|
+-----+----------+----------+----------+--------------------+--------------------+

我确实需要删除具有相同 hash 列的行,但只删除 batch_date 列中与连续月份相关的行。在这种情况下,我会从第一次出现重复项到最后一个 until 列的值获取所有内容。非连续重复必须保持不变。

示例

  • r_id0==0 必须保持不变。在 batch_date 列中,相应条目之间存在 3 个月的间隔。
  • r_id0==1 必须只有一行。它的 batch_dates 个月是连续的。 结果应该是这样的:
+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date|      from|     until|                desc|                hash|
+-----+----------+----------+----------+--------------------+--------------------+
|    0|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 42|b10192333e9c61a40...|
|    0|2020-04-30|2020-04-30|9999-12-31|  Esto es un campo 42|b10192333e9c61a40...|
|    1|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 1|b10192333e9c61a40...|
|    2|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 2|b31c138a4b6a96169...|
etc etc... 

我为 r_id0==1 执行了我想要的功能,但它也会修改 r_id0==0 的情况:

def removing_duplicates2(final_df):
    '''
    Removing duplicates. It takes all from first time they appear and last "until" column.
    '''
    # we create this list in order to avor_id iteration over the column "hash", which will be our groupby column
    iterating_columns= final_df.columns
    iterating_columns.remove("hash")
    exprs =  [F.first(x).alias(x) if x!="until" else F.last(x).alias(x)  for x in iterating_columns] 
    return  (
         final_df.groupBy("hash").agg(*exprs)
            .dropDuplicates(["hash"]).select(columns)
         )

这是结果,破坏了r_id0==0

+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date|      from|     until|                desc|                hash|
+-----+----------+----------+----------+--------------------+--------------------+
|    0|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 42|b10192333e9c61a40...|
|    1|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    2|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 2|b31c138a4b6a96169...|
etc etc

所以,我很困惑。我必须用 Pyspark 来做,这个例子将用于一个巨大的,该死的巨大的表,由 batch_date 分区。 我坚信我创建的每一个循环都是朝着服务器爆炸和消防员指责我迈出的一步(我已经在使用一个(用于迭代 batch_date))。

对于冗长的描述,我们深表歉意, 任何意见或建议都非常受欢迎。

谢谢!

【问题讨论】:

  • 您对 关于 batch_date 的相邻的 到底是什么意思?两个 batch_dates 之间的差异可以有多大,以至于它们仍然被认为是相邻的? 2020-02-142020-03-31 的日期是否足够接近?
  • 抱歉,我会更新说明。提到的日期有连续的月份,所以是的,它适用于您的示例

标签: pyspark apache-spark-sql duplicates


【解决方案1】:

我们的想法是在第一步中使用Window 识别只有连续月份的组。然后只对第一步中找到的行进行分组,同时保持所有其他行不变。

创建一些测试数据:

from datetime import date
data = [[0, date.fromisoformat("2020-01-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("2020-02-27"), "b10192333e9c61a40"],
    [0, date.fromisoformat("2020-04-30"), date.fromisoformat("2020-04-30"), date.fromisoformat("9999-12-31"), "b10192333e9c61a40"],
    [0, date.fromisoformat("2020-05-01"), date.fromisoformat("2020-04-30"), date.fromisoformat("9999-12-31"), "b10192333e9c61a40"],
    [1, date.fromisoformat("2019-12-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [1, date.fromisoformat("2020-01-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [1, date.fromisoformat("2020-02-28"), date.fromisoformat("2020-02-28"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [1, date.fromisoformat("2020-03-31"), date.fromisoformat("2020-03-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [2, date.fromisoformat("2020-03-31"), date.fromisoformat("2020-03-31"), date.fromisoformat("9999-12-31"), "abcd"]]

df=spark.createDataFrame(data, schema=["r_id0","batch_date","from","until","hash"])

识别那些仅包含连续月份的r_id0 值:

from pyspark.sql import functions as F

w = Window.partitionBy("r_id0").orderBy("batch_date")
df2=df.withColumn("prev_bd_month", F.month(F.lag("batch_date").over(w)))\
    .withColumn("prev_bd_year", F.year(F.lag("batch_date").over(w))) \
    .withColumn("adj", F.when((F.year("batch_date").eqNullSafe(F.col("prev_bd_year")) 
                              & (F.month("batch_date") - F.col("prev_bd_month") == 1)) 
                              | ((F.year("batch_date") - F.col("prev_bd_year") == 1) 
                              & (F.month("batch_date") == 1) & (F.col("prev_bd_month") == 12) )
                              | F.col("prev_bd_year").isNull() ,1).otherwise(None)) \
    .groupBy("r_id0") \
    .agg(F.count("*").alias("count_all"), F.sum("adj").alias("count_adj")) \
    .withColumn("all_adj", F.col("count_all") == F.col("count_adj")) \
    .drop("count_all", "count_adj") \
    .join(df, "r_id0") \
    .cache()

中间结果:

+-----+-------+----------+----------+----------+-----------------+
|r_id0|all_adj|batch_date|      from|     until|             hash|
+-----+-------+----------+----------+----------+-----------------+
|    0|  false|2020-01-31|2020-01-31|2020-02-27|b10192333e9c61a40|
|    0|  false|2020-04-30|2020-04-30|9999-12-31|b10192333e9c61a40|
|    0|  false|2020-05-01|2020-04-30|9999-12-31|b10192333e9c61a40|
|    1|   true|2019-12-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
|    1|   true|2020-01-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
|    1|   true|2020-02-28|2020-02-28|9999-12-31| 63b3e8201cd417bb|
|    1|   true|2020-03-31|2020-03-31|9999-12-31| 63b3e8201cd417bb|
|    2|   true|2020-03-31|2020-03-31|9999-12-31|             abcd|
+-----+-------+----------+----------+----------+-----------------+

all_adj == true 分组行并保留所有其他行:

df3=df2.filter("all_adj == true") \
    .groupBy("r_id0") \
    .agg(F.min("batch_date").alias("batch_date"), 
         F.expr("min_by(from, batch_date)").alias("from"),
         F.max("until").alias("until"),
         F.expr("min_by(hash, batch_date)").alias("hash")) \
    .union(df2.filter("all_adj == false").drop("all_adj"))

结果:

+-----+----------+----------+----------+-----------------+
|r_id0|batch_date|      from|     until|             hash|
+-----+----------+----------+----------+-----------------+
|    1|2019-12-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
|    2|2020-03-31|2020-03-31|9999-12-31|             abcd|
|    0|2020-01-31|2020-01-31|2020-02-27|b10192333e9c61a40|
|    0|2020-04-30|2020-04-30|9999-12-31|b10192333e9c61a40|
|    0|2020-05-01|2020-04-30|9999-12-31|b10192333e9c61a40|
+-----+----------+----------+----------+-----------------+

【讨论】:

    猜你喜欢
    • 2021-09-14
    • 1970-01-01
    • 2020-03-30
    • 1970-01-01
    • 2018-08-11
    • 2018-12-01
    • 2020-11-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多