【问题标题】:Pyspark calculate row-wise weighted average with null entriesPyspark 使用空条目计算行加权平均值
【发布时间】:2021-07-29 15:35:32
【问题描述】:

我有多个数据框,其值是根据不同的源数据计算得出的。为简单起见,我将举一个包含三个数据帧的示例,但我正在寻找一个包含 n 个数据帧的解决方案

数据_1

+------+-----------+
|person|first_value|
+------+-----------+
|     1|        1.0|
|     2|        0.9|
|     3|        0.8|
|     4|        0.7|
+------+-----------+

数据_2

+------+------------+
|person|second_value|
+------+------------+
|     1|         0.5|
|     2|         0.6|
|     4|         0.7|
+------+------------+

data_3

+------+-----------+
|person|third_value|
+------+-----------+
|     1|        0.2|
|     3|        0.9|
|     4|        0.6|
+------+-----------+

现在我想计算两个或多个数据帧的加权平均值 - 为此我首先合并数据帧

+------+-----------+------------+-----------+
|person|first_value|second_value|third_value|
+------+-----------+------------+-----------+
|     1|        1.0|         0.5|        0.2|
|     2|        0.9|         0.6|       null|
|     3|        0.8|        null|        0.9|
|     4|        0.8|         0.7|        0.6|
+------+-----------+------------+-----------+

组合值的公式为:

val = val1 * weight1 + val2 * weight2 + val3 * weight3

但是,如果其中一个值为空,则另一个值应该能够加起来为 1,因此如果 val2 为空,那么权重 2 应该分配给所有其他权重。我只是找不到一种优雅的方式来做到这一点。

在 w1 = 0.3, w2 = 0.4, w3 = 0.3 的情况下,我目前得到这个是因为我的公式:

+------+----+
|person| val|
+------+----+
|     3|null|
|     1|0.56|
|     4| 0.7|
|     2|null|
+------+----+

但是我想要这个:

+------+-----+
|person|  val|
+------+-----+
|     1| 0.56|
|     2|0.729|  <- val1*weight1_adj2 + val2*weight2_adj2
|     3| 0.85|  <- val1*weight1_adj3 + val3*weight3_adj3
|     4|  0.7|
+------+-----+

调整权重

weight1_adj2 = w1/(w1+w2) = 0.57
weight2_adj2 = w2/(w1+w2) = 0.43
weight1_adj3 = w1/(w1+w3) = 0.5
weight3_adj3 = w3/(w1/w3) = 0.5

有没有办法在 pyspark 甚至 sql 中解决这个问题,或者我必须进入 udf 吗?

这是我当前不处理空值的代码:

data1 = [("1",1.0), 
        ("2",0.9), 
        ("3",0.8), 
        ("4",0.8) 
      ]

schema1 = ["person","first_value"]
first_df = spark.createDataFrame(data=data1, schema = schema1)

data2 = [("1",0.5), 
        ("2",0.6), 
        ("4",0.7) 
      ]

schema2 = ["person","second_value"]
second_df = spark.createDataFrame(data=data2, schema = schema2)

data3 = [("1",0.2), 
        ("3",0.9), 
        ("4",0.6) 
      ]

schema3 = ["person","third_value"]
third_df = spark.createDataFrame(data=data3, schema = schema3)

combined_df = first_df.join(
  second_df, ['person'], how='full'
).join(
  third_df, ['person'], how='full'
)

w1 = 0.3
w2 = 0.4
w3 = 0.3
combined_df.groupBy(['person']).agg(
  F.sum(
    col('first_value')*w1 + col('second_value')*w2  + col('third_value')*w3
  ).alias('val')).show()

Edit1:我不是要按此处所述按行添加空值:Spark dataframe not adding columns with null values - 我需要处理权重,以便乘以非空值的权重总和始终为 1

【问题讨论】:

    标签: python apache-spark pyspark rdd


    【解决方案1】:

    想法是将列不为空的每行的所有权重相加,然后将各个权重除以该总和。

    为了在列数及其权重方面获得一定的灵活性,我将权重存储在一个字典中,使用列名作为键:

    weights = {"first_value": 0.3, "second_value": 0.4, "third_value": 0.3}
    

    然后我可以遍历dict到

    • 计算非空列的权重总和
    • 然后计算value of column * weight / sum of weights 的所有非列的总和
    wf = "1 / ("
    val = ""
    for col in weights:
        wf += f"if({col} is null,0 ,{weights[col]}) + "
        val += f"if( {col} is null, 0, {col} * {weights[col]} * weight_factor) + "
    wf += "0 )"
    val += "0"
    
    combined_df = combined_df.withColumn("weight_factor", F.expr(wf)) \
        .withColumn("val", F.expr(val))
    

    输出:

    +------+-----------+------------+-----------+-----------------+------------------+
    |person|first_value|second_value|third_value|    weight_factor|               val|
    +------+-----------+------------+-----------+-----------------+------------------+
    |     1|        1.0|         0.5|        0.2|1.000000000000000|              0.56|
    |     2|        0.9|         0.6|       null|1.428571428571429|0.7285714285714289|
    |     3|        0.8|        null|        0.9|1.666666666666667|0.8500000000000002|
    |     4|        0.8|         0.7|        0.6|1.000000000000000|               0.7|
    +------+-----------+------------+-----------+-----------------+------------------+
    

    下一步,您可以继续对val 进行聚合和求和。

    【讨论】:

    • 谢谢,我是 Spark 的新手,不知道 F 表达式。
    • 我同时为自己解决了这个问题,方法是为每个值创建一个权重列,该列与基于值 beeing/not beeing null 的布尔值相乘:.withColumn("w3", (lit( weight_settings['third_value']) * (~col('third_value').isNull()).cast('integer')) ) 然后将其除以所有权重的行和。不过,您的解决方案似乎更简单。
    猜你喜欢
    • 2016-12-14
    • 2011-02-12
    • 2021-11-24
    • 2010-10-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-02
    • 1970-01-01
    相关资源
    最近更新 更多