【问题标题】:PySpark DataFrame: Change cell value based on min/max condition in another columnPySpark DataFrame:根据另一列中的最小/最大条件更改单元格值
【发布时间】:2018-07-03 13:27:11
【问题描述】:

我有以下 pySpark 数据框:

+------------------+------------------+--------------------+--------------+-------+
|              col1|              col2|                col3|             X|      Y|
+------------------+------------------+--------------------+--------------+-------+
|2.1729247374294496| 3.558069532647046|   6.607603368496324|             1|   null|
|0.2654841575294071|1.2633077949463256|0.023578679968183733|             0|   null|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554|             3|   null|
| 2.608497168338446| 3.529397129549324|   0.373034222141551|             2|   null|
+------------------+------------------+--------------------+--------------+-------+

这是一个相当简单的操作,我可以用 pandas 轻松完成。但是,我只需要使用 pySpark 即可。

我想做以下事情(我会写一些伪代码):

在 col3 == max(col3) 的行中,将 Y 从 null 更改为 'K'

在剩余的行中,在 col1 == max(col1) 的行中,将 Y 从 null 更改为 'Z'

在剩余的行中,在 col1 == min(col1) 的行中,将 Y 从 null 更改为 'U'

在剩余行中:将 Y 从 null 更改为 'I'。

因此,预期的输出是:

+------------------+------------------+--------------------+--------------+-------+
|              col1|              col2|                col3|             X|      Y|
+------------------+------------------+--------------------+--------------+-------+
|2.1729247374294496| 3.558069532647046|   6.607603368496324|             1|      K|
|0.2654841575294071|1.2633077949463256|0.023578679968183733|             0|      U|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554|             3|      I|
| 2.608497168338446| 3.529397129549324|   0.373034222141551|             2|      Z|
+------------------+------------------+--------------------+--------------+-------+

完成后,我需要将此表用作另一个表的查找:

+--------------------+--------+-----+------------------+--------------+------------+
|                  x1|      x2|   x3|                x4|             X|           d|
+--------------------+--------+-----+------------------+--------------+------------+
|0057f68a-6330-42a...|    2876|   30| 5.989999771118164|             0|    20171219|
|05cc0191-4ee4-412...|  108381|   34|24.979999542236328|             3|    20171219|
|06f353af-e9d3-4d0...|  118798|   34|               0.0|             3|    20171219|
|0c69b607-112b-4f3...|   20993|   34|               0.0|             0|    20171219|
|0d1b52ba-1502-4ff...|   23817|   34|               0.0|             0|    20171219|

我想使用第一个表作为查找来在第二个表中创建一个新列。新列的值应在第一个表的 Y 列中查找,使用第二个表中的 X 列作为键(因此我们在第一个表中的 Y 列中查找与 X 列中的值相对应的值,这些值来自 X 列第二张表)。

UPD:我需要一个对满足两个条件的行具有鲁棒性的解决方案,例如:

+------------------+------------------+--------------------+--------------+-------+
|              col1|              col2|                col3|             X|      Y|
+------------------+------------------+--------------------+--------------+-------+
| 2.608497168338446| 3.558069532647046|   6.607603368496324|             1|   null|
|0.2654841575294071|1.2633077949463256|0.023578679968183733|             0|   null|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554|             3|   null|
|2.1729247374294496| 3.529397129549324|   0.373034222141551|             2|   null|
+------------------+------------------+--------------------+--------------+-------+

在这种情况下,第 0 行同时满足 max('col3') 和 max('col1') 条件。

所以需要做的是:

第 0 行变为“K”

第 3 行变为 'Z'(因为在剩余行中(0 已经有 'K' 第 3 行满足 max('col1') 条件)

第 1 行变为“U”

第 2 行变成“我”

我不能在表 1 中有多个行,其中包含“I”。

【问题讨论】:

    标签: python apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    计算聚合:

    from pyspark.sql import functions as F
    
    df = spark.createDataFrame([
        (2.1729247374294496,  3.558069532647046,    6.607603368496324, 1),
        (0.2654841575294071, 1.2633077949463256, 0.023578679968183733, 0),
        (0.4253301781296708, 3.4566490739823483,  0.11711202266039554, 3),
        (2.608497168338446,  3.529397129549324,    0.373034222141551, 2)
    ], ("col1", "col2", "col3", "x"))
    
    min1, max1, max3 = df.select(F.min("col1"), F.max("col1"), F.max("col3")).first()
    

    when添加列:

    y = (F.when(F.col("col3") == max3, "K")  # In row where col3 == max(col3), change Y from null to 'K'
        .when(F.col("col1") == max1, "Z")    # In the remaining rows, in the row where col1 == max(col1), change Y from null to 'Z'
        .when(F.col("col1") == min1, "U")    # In the remaining rows, in the row where col1 == min(col1), change Y from null to 'U'
        .otherwise("I"))                     # In the remaining row: change Y from null to 'I'
    
    df_with_y = df.withColumn("y", y)
    
    
    df_with_y.show()
    # +------------------+------------------+--------------------+---+---+
    # |              col1|              col2|                col3|  x|  y|
    # +------------------+------------------+--------------------+---+---+
    # |2.1729247374294496| 3.558069532647046|   6.607603368496324|  1|  K|
    # |0.2654841575294071|1.2633077949463256|0.023578679968183733|  0|  U|
    # |0.4253301781296708|3.4566490739823483| 0.11711202266039554|  3|  I|
    # | 2.608497168338446| 3.529397129549324|   0.373034222141551|  2|  Z|
    # +------------------+------------------+--------------------+---+---+
    

    应使用第二个表中的 X 列作为键,在第一个表的 Y 列中查找新列的值

    df_with_y.select("x", "Y").join(df2, ["x"])
    

    如果y 已经存在,并且你要保留非空值:

    df_ = spark.createDataFrame([
        (2.1729247374294496,  3.558069532647046,    6.607603368496324, 1, "G"),
        (0.2654841575294071, 1.2633077949463256, 0.023578679968183733, 0, None),
        (0.4253301781296708, 3.4566490739823483,  0.11711202266039554, 3, None),
        (2.608497168338446,  3.529397129549324,    0.373034222141551, 2, None)
    ], ("col1", "col2", "col3", "x", "y"))
    
    min1_, max1_, max3_ = df.filter(F.col("y").isNull()).select(F.min("col1"), F.max("col1"), F.max("col3")).first()
    
    y_ = (F.when(F.col("col3") == max3_, "K") 
        .when(F.col("col1") == max1_, "Z")
        .when(F.col("col1") == min1_, "U") 
        .otherwise("I"))
    
    df_.withColumn("y", F.coalesce(F.col("y"), y_)).show()
    
    
    # +------------------+------------------+--------------------+---+---+
    # |              col1|              col2|                col3|  x|  y|
    # +------------------+------------------+--------------------+---+---+
    # |2.1729247374294496| 3.558069532647046|   6.607603368496324|  1|  G|
    # |0.2654841575294071|1.2633077949463256|0.023578679968183733|  0|  U|
    # |0.4253301781296708|3.4566490739823483| 0.11711202266039554|  3|  I|
    # | 2.608497168338446| 3.529397129549324|   0.373034222141551|  2|  K|
    # +------------------+------------------+--------------------+---+---+
    

    如果您遇到数值精度问题,可以尝试:

    threshold = 0.0000001 # Choose appropriate 
    
    y_t = (F.when(F.abs(F.col("col3") - max3) < threshold, "K")  # In row where col3 == max(col3), change Y from null to 'K'
        .when(F.abs(F.col("col1") - max1) < threshold, "Z")    # In the remaining rows, in the row where col1 == max(col1), change Y from null to 'Z'
        .when(F.abs(F.col("col1") - min1) < threshold, "U")    # In the remaining rows, in the row where col1 == min(col1), change Y from null to 'U'
        .otherwise("I"))                     # In the remaining row: change Y from null to 'I'
    
    df.withColumn("y", y_t).show()
    # +------------------+------------------+--------------------+---+---+
    # |              col1|              col2|                col3|  x|  y|
    # +------------------+------------------+--------------------+---+---+
    # |2.1729247374294496| 3.558069532647046|   6.607603368496324|  1|  K|
    # |0.2654841575294071|1.2633077949463256|0.023578679968183733|  0|  U|
    # |0.4253301781296708|3.4566490739823483| 0.11711202266039554|  3|  I|
    # | 2.608497168338446| 3.529397129549324|   0.373034222141551|  2|  Z|
    # +------------------+------------------+--------------------+---+---+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-09-30
      • 1970-01-01
      • 2021-07-25
      • 1970-01-01
      • 2020-02-26
      • 2017-11-23
      • 2022-11-14
      • 1970-01-01
      相关资源
      最近更新 更多