【问题标题】:Conditionally replace value in a row from another row value in the same column based on value in another column in Pyspark?根据Pyspark中另一列中的值有条件地从同一列中的另一行值替换一行中的值?
【发布时间】:2020-05-20 20:11:31
【问题描述】:

这在网络上有一些变化,但不是我所期望的。 我有一个像这样的数据框:

     +------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|New_UL  |New_LL   |
+------+-------+------------+---------------+----------------+--------+---------+
|790026|9160   |0           |1              |0               |26.1184 |23.2954  |
|790026|13509  |0           |0              |1               |Infinity|-Infinity|
|790026|9162   |0           |0              |0               |25.03535|23.48585 |
|790026|13510  |0           |0              |1               |Infinity|-Infinity|
|790048|9162   |0           |0              |0               |33.5    |30.5     |
|790048|13509  |0           |0              |1               |Infinity|-Infinity|
|790048|13510  |0           |0              |0               |NaN     |NaN      |
|790048|9160   |0           |1              |0               |33.94075|30.75925 |
+------+-------+------------+---------------+----------------+--------+---------+

我想将use_golden_limit 为1 的New_ULNew_LL 值替换为每个SEQ_IDis_golden_limit 为1 的值。所以,在这种情况下,预期的结果是:

 +------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|New_UL  |New_LL   |
+------+-------+------------+---------------+----------------+--------+---------+
|790026|9160   |0           |1              |0               |26.1184 |23.2954  |
|790026|13509  |0           |0              |1               |26.1184 |23.2954  |
|790026|9162   |0           |0              |0               |25.03535|23.48585 |
|790026|13510  |0           |0              |1               |26.1184 |23.2954  |
|790048|9162   |0           |0              |0               |33.5    |30.5     |
|790048|13509  |0           |0              |1               |33.94075|30.75925 |
|790048|13510  |0           |0              |0               |NaN     |NaN      |
|790048|9160   |0           |1              |0               |33.94075|30.75925 |
+------+-------+------------+---------------+----------------+--------+---------+

这可能吗?

【问题讨论】:

  • “1”中的“is_golden_limit”是否预计超过一行?
  • @Mitodina,理想情况下is_golden_limit= 1 不应超过一行。我有代码来识别此类情况以单独处理它们。不过,这是个好问题。如果它确实有多个 =1 的行,是否取第一个值?
  • @thentangler 请检查我的解决方案

标签: pyspark pyspark-sql pyspark-dataframes


【解决方案1】:

根据要求,它只会为每个 ID 取 is_golden_limit 的第一个值。

创建您的数据框

from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import *
import numpy as np

list=[[790026,9160,0,1,0,26.1184,23.2954],
[790026,13509,0,0,1,np.inf,-np.inf],
[790026,9162,0,0,0,25.03535,23.48585],
[790026,13510,0,0,1,np.inf,-np.inf],
[790048,9162,0,0,0,33.5,30.5],
[790048,13509,0,0,1,np.inf,-np.inf],
[790048,13510,0,0,0,np.NaN,np.NaN],
[790048,9160,0,1,0,33.94075,30.75925 ]]

df= spark.createDataFrame(list,['SEQ_ID','TOOL_ID','isfleetlevel','is_golden_limit','use_golden_limit','New_UL','New_LL'])

+------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|  New_UL|   New_LL|
+------+-------+------------+---------------+----------------+--------+---------+
|790026|   9160|           0|              1|               0| 26.1184|  23.2954|
|790026|  13509|           0|              0|               1|Infinity|-Infinity|
|790026|   9162|           0|              0|               0|25.03535| 23.48585|
|790026|  13510|           0|              0|               1|Infinity|-Infinity|
|790048|   9162|           0|              0|               0|    33.5|     30.5|
|790048|  13509|           0|              0|               1|Infinity|-Infinity|
|790048|  13510|           0|              0|               0|     NaN|      NaN|
|790048|   9160|           0|              1|               0|33.94075| 30.75925|
+------+-------+------------+---------------+----------------+--------+---------+

选择用于自连接的新数据框

并为每个 ID 首次出现 is_golden_limit 值

w=Window().partitionBy("SEQ_ID").orderBy("SEQ_ID")
df1=df.select(F.col("is_golden_limit").alias("use_golden_limit"),F.col("New_UL").alias("New_UL1"),F.col("New_LL").alias("New_LL1"),"SEQ_ID").filter(F.col("is_golden_limit")==1).withColumn('row_num',F.row_number().over(w)).filter(F.col("row_num")==1).drop("row_num")

+----------------+--------+--------+------+
|use_golden_limit| New_UL1| New_LL1|SEQ_ID|
+----------------+--------+--------+------+
|               1| 26.1184| 23.2954|790026|
|               1|33.94075|30.75925|790048|
+----------------+--------+--------+------+

使用条件连接和创建新列

df1 自然会是一个小得多的数据帧,因此,最好使用广播连接(向所有节点广播小数据帧,以便在连接中更好地协同定位)。

df2=df.join(df1.hint("broadcast"), on=['use_golden_limit','SEQ_ID'], how='left')
df3=df2.withColumn("New_UL_Final", F.when((F.col("use_golden_limit")==1),F.col("New_UL1")).otherwise(F.col("New_UL")))\
   .withColumn("New_LL_Final", F.when((F.col("use_golden_limit")==1),F.col("New_LL1")).otherwise(F.col("New_LL")))\
   .orderBy("SEQ_ID").drop("New_UL","New_LL","New_LL1","New_UL1")

选择最终数据框和 .show()

df4=df3.select("SEQ_ID","TOOL_ID","isfleetlevel","is_golden_limit","use_golden_limit",F.col("New_UL_Final").alias("New_UL"),
          F.col("New_LL_Final").alias("New_LL"))
df4.show()

最终数据框:

+------+-------+------------+---------------+----------------+--------+--------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|  New_UL|  New_LL|
+------+-------+------------+---------------+----------------+--------+--------+
|790026|  13510|           0|              0|               1| 26.1184| 23.2954|
|790026|   9162|           0|              0|               0|25.03535|23.48585|
|790026|  13509|           0|              0|               1| 26.1184| 23.2954|
|790026|   9160|           0|              1|               0| 26.1184| 23.2954|
|790048|  13509|           0|              0|               1|33.94075|30.75925|
|790048|   9160|           0|              1|               0|33.94075|30.75925|
|790048|   9162|           0|              0|               0|    33.5|    30.5|
|790048|  13510|           0|              0|               0|     NaN|     NaN|
+------+-------+------------+---------------+----------------+--------+--------+

【讨论】:

  • 谢谢!使用 isgoldenlimit 作为 usegoldenlimit 是一种简洁的解决方案。我有一个担心:云中的连接很昂贵,广播文档提到连接的最大表大小为 10MB。这种方法是否适用于 1B 行或更多行的数据帧?
  • 你说得对,默认是 10MB,但是最近 Spark 将广播表的最大大小从 2GB 增加到 8GB(如果您使用的是最新版本)。您可以将 spark.sql.autoBroadcastJoinThreshold 设置为最多 8 个演出。
  • 8GB广播源代码:github.com/apache/spark/blob/…优化连接链接:medium.com/datakaresolutions/…
  • 在您的用例中,即使您有超过 10 亿行,df1(只有 isgoldenlimit==1 的较小数据帧)也不应该超过 8GB(也因为它只有 3 列)。与大表的广播连接将得到高度优化。
猜你喜欢
  • 2012-11-06
  • 2021-07-02
  • 1970-01-01
  • 2021-10-01
  • 2022-07-27
  • 2020-03-22
  • 2019-02-03
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多