【问题标题】:Partially replace dataframe with rows from another用另一个行部分替换数据框
【发布时间】:2021-09-03 04:02:40
【问题描述】:

我有两个相似的数据框,一个有一个日期,另一个有多个日期加上一个附加列:

df:

| yyyy_mm_dd | id  | region | country  | product | count |
|------------|-----|--------|----------|---------|-------|
| 2021-06-14 | 111 | EMEA   | Spain    | P1      | 10    |
| 2021-06-14 | 111 | EMEA   | England  | P1      | 9     |
| 2021-06-14 | 111 | EMEA   | France   | P1      | 10    |
| 2021-06-14 | 111 | EMEA   | Spain    | P2      | 299   |
| 2021-06-14 | 111 | EMEA   | England  | P2      | 39    |
| 2021-06-14 | 111 | EMEA   | France   | P2      | 10    |
| 2021-06-14 | 112 | LATAM  | Brazil   | P1      | 64    |
| 2021-06-14 | 112 | LATAM  | Paraguay | P2      | 21    |
| 2021-06-14 | ... | ...    | ...      | ...     | ...   |

df1:

| yyyy_mm_dd | id  | region | country  | product | count | fullfilments |
|------------|-----|--------|----------|---------|-------|--------------|
| 2021-06-14 | 111 | EMEA   | Spain    | P1      | 1     | 1            |
| 2021-06-14 | 111 | EMEA   | England  | P1      | 1     | 3            |
| 2021-06-14 | 111 | EMEA   | France   | P1      | 2     | 4            |
| 2021-06-14 | 111 | EMEA   | Spain    | P2      | 1     | 1            |
| 2021-06-14 | 111 | EMEA   | England  | P2      | 2     | 1            |
| 2021-06-14 | 111 | EMEA   | France   | P2      | 1     | 5            |
| 2021-06-14 | 112 | LATAM  | Brazil   | P1      | 2     | 2            |
| 2021-06-14 | 112 | LATAM  | Paraguay | P2      | 21    | 1            |
| 2021-06-14 | ... | ...    | ...      | ...     | ...   | ...          |
| 2021-06-13 | 111 | EMEA   | Spain    | P1      | 0     | 1            |
| 2021-06-13 | 111 | EMEA   | England  | P2      | 0     | 2            |

df1 有很多日期的分组数据,而 df 只有一个日期。我想用 df 中的 count 替换 df1 中的 count 列,以匹配行(yyyy_mm_dd、id、region、country、product)并保留 fullfilments。

我可能将两者结合在一起并在第一个 df 中删除计数,但是我只想替换日期匹配的位置并保留 df1 中的所有其他行。

【问题讨论】:

    标签: python-3.x apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以简单地join 并使用coalesce 函数。

    当您从第一个数据帧到第二个数据帧执行 left join 时,唯一匹配的记录具有非空 new_count 值。现在,使用coalesce 函数,该函数将在第一个值不为空时返回,而在第一个为空时返回第二个值。

    coalesce(a   , b   ) => a
    coalesce(a   , null) => a
    coalesce(null, b   ) => b
    

    从您的数据帧中,

    from pyspark.sql import functions as f
    
    df1 = spark.read.option("inferSchema","true").option("header","true").csv("test1.csv")
    
    +----------+---+------+--------+-------+-----+
    |yyyy_mm_dd|id |region|country |product|count|
    +----------+---+------+--------+-------+-----+
    |2021-06-14|111|EMEA  |Spain   |P1     |10   |
    |2021-06-14|111|EMEA  |England |P1     |9    |
    |2021-06-14|111|EMEA  |France  |P1     |10   |
    |2021-06-14|111|EMEA  |Spain   |P2     |299  |
    |2021-06-14|111|EMEA  |England |P2     |39   |
    |2021-06-14|111|EMEA  |France  |P2     |10   |
    |2021-06-14|112|LATAM |Brazil  |P1     |64   |
    |2021-06-14|112|LATAM |Paraguay|P2     |21   |
    +----------+---+------+--------+-------+-----+
    
    df2 = spark.read.option("inferSchema","true").option("header","true").csv("test2.csv")
    
    +----------+---+------+--------+-------+-----+------------+
    |yyyy_mm_dd|id |region|country |product|count|fullfilments|
    +----------+---+------+--------+-------+-----+------------+
    |2021-06-14|111|EMEA  |Spain   |P1     |1    |1           |
    |2021-06-14|111|EMEA  |England |P1     |1    |3           |
    |2021-06-14|111|EMEA  |France  |P1     |2    |4           |
    |2021-06-14|111|EMEA  |Spain   |P2     |1    |1           |
    |2021-06-14|111|EMEA  |England |P2     |2    |1           |
    |2021-06-14|111|EMEA  |France  |P2     |1    |5           |
    |2021-06-14|112|LATAM |Brazil  |P1     |2    |2           |
    |2021-06-14|112|LATAM |Paraguay|P2     |21   |1           |
    |2021-06-13|111|EMEA  |Spain   |P1     |0    |1           |
    |2021-06-13|111|EMEA  |England |P2     |0    |2           |
    +----------+---+------+--------+-------+-----+------------+
    

    两个数据帧的连接如下:

    cols_to_join = ['yyyy_mm_dd', 'id', 'region', 'country', 'product']
    df3 = df2.join(df1.withColumnRenamed('count', 'new_count'), cols_to_join, 'left') \
             .withColumn('count', f.coalesce('new_count', 'count')).drop('new_count')
    df3.show(truncate=False)
    
    +----------+---+------+--------+-------+-----+------------+
    |yyyy_mm_dd|id |region|country |product|count|fullfilments|
    +----------+---+------+--------+-------+-----+------------+
    |2021-06-14|111|EMEA  |Spain   |P1     |10   |1           |
    |2021-06-14|111|EMEA  |England |P1     |9    |3           |
    |2021-06-14|111|EMEA  |France  |P1     |10   |4           |
    |2021-06-14|111|EMEA  |Spain   |P2     |299  |1           |
    |2021-06-14|111|EMEA  |England |P2     |39   |1           |
    |2021-06-14|111|EMEA  |France  |P2     |10   |5           |
    |2021-06-14|112|LATAM |Brazil  |P1     |64   |2           |
    |2021-06-14|112|LATAM |Paraguay|P2     |21   |1           |
    |2021-06-13|111|EMEA  |Spain   |P1     |0    |1           |
    |2021-06-13|111|EMEA  |England |P2     |0    |2           |
    +----------+---+------+--------+-------+-----+------------+
    

    【讨论】:

    • 我实际上也找到了类似的解决方案!虽然我最初的问题有一个小错误。当日期匹配时,是否也可以保留 df1 的计数。 IE。当df2.yyyy_mm_dd df1.yyyy_mm_dd 时,我只想要df2 计数,否则df1.count。我想在加入之前做date_add(df1.yyyy_mm_dd,1),我认为它会起作用,但有点hacky
    • 我根据 yyyy_mm_dd 使用 .when().otherwise() 使其工作。
    【解决方案2】:

    每次您需要从不同的数据框中检索一列时,您都必须加入它们:

    import pyspark.sql.functions as f
    
    df2 = df1.join(df.withColumnRenamed('count', 'new_count'),
                   on=['yyyy_mm_dd', 'id', 'region', 'country', 'product'], how='left')
    
    df2 = (df2
           .withColumn('count', f.coalesce('new_count', 'count'))
           .drop('new_count'))
    df2.show(truncate=False)
    

    【讨论】:

    • 这不会删除 df1 中不在 df 中的每个日期吗?我想保留它们,只替换示例中的匹配日期 2021-06-14。
    • 我仍然认为这是不正确的,因为您将删除 df1 中所有行的计数,并且在左加入后,只会计数 2021-06-14。 IE。在示例 df1 中,最后两行将丢失。我想保留 df1 中的所有行并仅替换匹配日期的计数。
    • 再检查一遍。
    • 我尝试了最新的解决方案,但错误count 不明确,因为它在两个数据帧中。具体就是这部分.withColumn('count', f.coalesce('new_count', 'count'))
    • 您是否复制了在join 函数中指定的df.withColumnRenamed('count', 'new_count')
    猜你喜欢
    • 1970-01-01
    • 2021-01-24
    • 2021-02-18
    • 1970-01-01
    • 2020-11-13
    • 1970-01-01
    • 2017-05-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多