【问题标题】:How to update a pyspark dataframe with new values from another dataframe?如何使用来自另一个数据帧的新值更新 pyspark 数据帧?
【发布时间】:2019-01-22 00:00:29
【问题描述】:

我有两个 spark 数据框:

数据框 A:

|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |

和数据框 B:

|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |

数据框 B 可以包含来自数据框 A 的重复、更新和新行。我想在 spark 中编写一个操作,我可以在其中创建一个新数据框,其中包含来自数据框 A 的行以及来自数据框 B 的更新行和新行。

我首先创建了一个哈希列,其中仅包含不可更新的列。这是唯一的标识。所以假设col1col2 可以更改值(可以更新),但col3,..,coln 是唯一的。我创建了一个哈希函数为hash(col3,..,coln):

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

现在我想编写一些 spark 代码,基本上从 B 中选择散列不在 A 中的行 (所以新行和更新行) 并将它们与来自 A 的行。如何在 pyspark 中实现这一点?

编辑: 数据框 B 可以有来自数据框 A 的额外列,因此无法进行联合。

示例示例

数据框 A:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|  www|
|    b|  eee|
|    c|  rrr|
+-----+-----+

数据框 B:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    d|  yyy|    2|
|    c|  rer|    3|
+-----+-----+-----+

结果: 数据框 C:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    b|  eee| null|
|    c|  rer|    3|
|    d|  yyy|    2|
+-----+-----+-----+

【问题讨论】:

  • 您在寻找类似this answer 的东西吗?而不是散列,更好的方法是加入唯一 id。
  • 这与那个答案不相似,因为对我来说,我还需要保留来自数据框 B 的新行。
  • 我需要一个哈希列,因为我没有唯一的 id 列。
  • 您可以加入多个列,这应该相当于散列,但根据您的问题很难判断。你能提供一个reproducible example 一些小样本输入/所需的输出吗?

标签: python pyspark


【解决方案1】:

这与update a dataframe column with new values 密切相关,除了您还想从 DataFrame B 添加行。一种方法是首先执行链接问题中概述的内容,然后将结果与 DataFrame B 合并并删除重复项.

例如:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        'col_1',
        f.when(
            ~f.isnull(f.col('b.col_2')),
            f.col('b.col_2')
        ).otherwise(f.col('a.col_2')).alias('col_2'),
        'b.col_3'
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#|    a|  wew|    1|
#|    b|  eee| null|
#|    c|  rer|    3|
#|    d|  yyy|    2|
#+-----+-----+-----+

如果您有很多列要替换并且您不想全部硬编码,则更一般地使用列表推导:

cols_to_update = ['col_2']

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        *[
            ['col_1'] + 
            [
                f.when(
                    ~f.isnull(f.col('b.{}'.format(c))),
                    f.col('b.{}'.format(c))
                ).otherwise(f.col('a.{}'.format(c))).alias(c)
                for c in cols_to_update
            ] + 
            ['b.col_3']
        ]
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()

【讨论】:

  • 这可以使用合并简化吗?
  • @PaulBrannan 可能是的。随时发送edit 或发布您自己的答案。
【解决方案2】:

我会选择不同的解决方案,我认为它不那么冗长、更通用并且不涉及列列表。我将首先通过基于 keyCols(列表)执行内部连接来识别将被更新(replaceDf)的 dfA 子集。然后我会从 dfA 中减去这个 replaceDF 并将其与 dfB 合并。

    replaceDf = dfA.alias('a').join(dfB.alias('b'), on=keyCols, how='inner').select('a.*')
    resultDf = dfA.subtract(replaceDf).union(dfB).show()

即使 dfA 和 dfB 中有不同的列,您仍然可以通过从两个 DataFrame 中获取列列表并找到它们的联合来克服这个问题。然后我会 准备选择查询(而不是“select.('a.')*”),以便我只列出 dfB 中存在的 dfA 中的列 + “null as colname” 中不存在的列在dfB。

【讨论】:

    【解决方案3】:

    如果您只想保留唯一值,并要求严格正确的结果,那么union 后跟dropDupilcates 应该可以解决问题:

    columns_which_dont_change = [...]
    old_df.union(new_df).dropDuplicates(subset=columns_which_dont_change)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-06-19
      • 2016-12-20
      • 2015-12-31
      • 2018-12-25
      • 2020-09-20
      • 2019-01-28
      • 2019-07-10
      • 1970-01-01
      相关资源
      最近更新 更多