【发布时间】:2020-05-16 06:10:39
【问题描述】:
我正在使用graphframes、pyspark 和hive 处理图形数据。在处理数据时,我将构建一个图表,并最终将这些数据保存到一个 Hive 表中,我将不再更新它。
后续运行可能与之前运行的节点有关系,所以我要确保我不会重复数据。
例如,运行 #1 可能会找到节点:A、B、C。运行#2 可能会重新找到节点A,并且还会找到新的节点X、Y、Z。我不希望A 在我的表格中出现两次。
我正在寻找处理此问题的最佳方法,并希望解决以下问题:
- 在处理与其关联的元数据时,我需要跟踪节点的状态。在完成此处理后,我将仅希望将节点的数据持久保存到 Hive。
- 我想确保在遇到同一个节点时不会创建重复数据(例如,当我重新找到上面的
A节点时,我不想在 Hive 中插入另一行)
我目前正在修补最好的方法来做到这一点。我知道hive 现在支持 ACID 事务,但似乎pyspark 目前不支持 CRUD 类型的操作。所以这就是我的计划:
- 每次运行时,创建一个
dataframe来存储我找到的节点。 - 找到新节点时:检查 Hive 中是否已存在该节点(例如
sqlContext.sql("SELECT * FROM existingTable WHERE name="<NAME>")。如果不存在,则将dataframe更新为x = vertices.withColumn("name", F.when(F.col("id")=="a", "<THE-NEW-NAME>").otherwise(F.col("name")))以将其添加到我们的 Dataframe 中。 - 一旦所有节点都完成处理,创建一个临时视图:
x.createOrReplaceTempView("myTmpView") - 最后,使用
sqlContext.sql("INSERT INTO TABLE existingTable SELECT * FROM myTmpView")将我的临时视图中的数据插入到现有表中
我认为这会起作用,但它似乎非常hacky。我不确定这是否是由于我对 Hive/Spark 缺乏了解,或者这只是技术堆栈的性质。有一个更好的方法吗?以这种方式处理它是否有性能成本?
【问题讨论】:
标签: apache-spark hadoop pyspark hive apache-spark-sql