【问题标题】:Update only changed rows pyspark delta table databricks仅更新更改的行 pyspark delta table databricks
【发布时间】:2020-09-25 16:34:10
【问题描述】:

与创建的数据框相比,需要仅更新现有表中更改的行。所以现在,我确实减去并获取更改的行,但不确定如何合并到现有表中。

old_df = spark.sql("select * from existing table")
diff = new_df.subtract(old_df)

现在必须插入差异数据帧(如果是新行)或更新现有记录

(deltaTable.alias("full_df").merge(
    merge_df.alias("append_df"),
    "full_df.col1 = append_df.col1 OR full_df.col2 =append_df.col2") 
  .whenNotMatchedInsertAll() 
  .execute()
)

这不会更新现有记录(案例:col2 值已更改;col1 未更改)

【问题讨论】:

  • 将数据写入临时表并使用 jdbc 插入更新。

标签: pyspark merge databricks delta


【解决方案1】:

.whenMatchedUpdateAll() 接受一个可用于保留未更改行的条件:

(deltaTable.alias("full_df").merge(
    merge_df.alias("append_df"),
    "full_df.col1 = append_df.col1 OR full_df.col2 = append_df.col2") 
  .whenNotMatchedInsertAll()
  .whenMatchedUpdateAll("full_df.col1 != append_df.col1 OR full_df.col2 != append_df.col2")
  .execute()
)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-09-23
    • 2022-01-02
    • 2022-10-15
    • 1970-01-01
    • 2021-07-26
    • 1970-01-01
    • 2021-06-26
    相关资源
    最近更新 更多