【问题标题】:How to delete duplicates in streaming data?如何删除流数据中的重复项?
【发布时间】:2019-08-19 00:56:25
【问题描述】:

我正在使用带有 kafka、java8 的 spark-sql 2.4.1 结构化流。 例如,我的数据集如下所示

我需要根据 ColA ColB ColC 找出重复项,并根据 ColDate 取出其中最新的一份,然后删除其余的。

即 从上面的数据结果应该是

如何使用火花流来完成? 即,我将在流中获取数据,例如......如果按照前面指定的逻辑重复我需要删除记录,则不确定何时出现重复。 在流媒体场景中一般如何完成?

【问题讨论】:

  • "取最新的"意思是取最小的?你为什么不 groupByminColDate?你试过了吗?然后,您必须加入自身才能获取其余字段。

标签: apache-spark apache-spark-sql spark-structured-streaming


【解决方案1】:

在一般流式传输场景中,您可以使用哈希表检查重复项,您可以每隔 x 小时清空一次。

  hashed =table['ColA','ColB','ColC']
  hashed=hashed.withColumn("row_sha2", sha2(concat_ws("||", *hashed.columns), 256))

要在稍后阶段删除重复项,您可以将它们删除

import pyspark.sql.functions as f
df.groupBy("ColA","ColB","ColC").agg((f.count("*")>1).cast("int").alias("e")).show()

【讨论】:

    【解决方案2】:

    如果您的重复项实际上是重复项(即相同的记录,没有干预),您可以使用恰好一次的数据传递概念 (here)。

    即使您缓存了“最新”记录,这会导致在内存中存储 N 条记录(最坏的情况)。此外,对于消费者何时可以消费“正确”的记录,也无法保证。

    我认为你应该考虑批量作业。

    【讨论】:

    • 我需要像上面一样清理数据,一旦我从 Kafka 主题中读取数据
    • 您是否至少有一个最长的时间范围来应对重复事件的可能性?
    • 什么是时间表?
    • @Nir Hedvat,抱歉,您是在谈论时间戳还是时间范围?如果时间范围是指水印延迟期?它的 5 分钟批处理
    • 重复事件之间的最长时间是多少(您知道...)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-08-31
    • 1970-01-01
    • 1970-01-01
    • 2018-12-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多