【问题标题】:How to persist data to Hive from PySpark - Avoiding duplicates如何将数据从 PySpark 持久化到 Hive - 避免重复
【发布时间】:2020-05-16 06:10:39
【问题描述】:

我正在使用graphframespysparkhive 处理图形数据。在处理数据时,我将构建一个图表,并最终将这些数据保存到一个 Hive 表中,我将不再更新它。

后续运行可能与之前运行的节点有关系,所以我要确保我不会重复数据。

例如,运行 #1 可能会找到节点:ABC。运行#2 可能会重新找到节点A,并且还会找到新的节点XYZ。我不希望A 在我的表格中出现两次。

我正在寻找处理此问题的最佳方法,并希望解决以下问题:

  1. 在处理与其关联的元数据时,我需要跟踪节点的状态。在完成此处理后,我将希望将节点的数据持久保存到 Hive。
  2. 我想确保在遇到同一个节点时不会创建重复数据(例如,当我重新找到上面的A 节点时,我不想在 Hive 中插入另一行)

我目前正在修补最好的方法来做到这一点。我知道hive 现在支持 ACID 事务,但似乎pyspark 目前不支持 CRUD 类型的操作。所以这就是我的计划:

  1. 每次运行时,创建一个dataframe 来存储我找到的节点。
  2. 找到新节点时:检查 Hive 中是否已存在该节点(例如 sqlContext.sql("SELECT * FROM existingTable WHERE name="<NAME>")。如果不存在,则将 dataframe 更新为 x = vertices.withColumn("name", F.when(F.col("id")=="a", "<THE-NEW-NAME>").otherwise(F.col("name"))) 以将其添加到我们的 Dataframe 中。
  3. 一旦所有节点都完成处理,创建一个临时视图:x.createOrReplaceTempView("myTmpView")
  4. 最后,使用sqlContext.sql("INSERT INTO TABLE existingTable SELECT * FROM myTmpView") 将我的临时视图中的数据插入到现有表中

我认为这会起作用,但它似乎非常hacky。我不确定这是否是由于我对 Hive/Spark 缺乏了解,或者这只是技术堆栈的性质。有一个更好的方法吗?以这种方式处理它是否有性能成本?

【问题讨论】:

    标签: apache-spark hadoop pyspark hive apache-spark-sql


    【解决方案1】:

    在 deltalake api 中,使用 scala 和 python 支持 upserts(Merge)。这正是您想要实现的。

    https://docs.delta.io/latest/delta-update.html#merge-examples

    这是另一种解决方案

    1. 在您的表中有一个列 updated_time 时间戳
    2. 联合 prev_run_results 和 current_run_results
    3. 按“节点”分组,选择最新的时间戳
    4. 保存结果

    【讨论】:

    • 使用建议的替代解决方案,这不会在写回之前读取所有数据吗?如果数据非常大,这似乎是不可取的?
    • 是的,这是一种高效的读取方式,如果您想高效写入,请重复写入,然后在读取时只获取最新的(使用第 3 步)。
    猜你喜欢
    • 2019-10-23
    • 1970-01-01
    • 2017-05-10
    • 1970-01-01
    • 1970-01-01
    • 2018-08-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多