【问题标题】:Divide elements of column by a sum of elements (of same column) grouped by elements of another column将列的元素除以由另一列的元素分组的(同一列的)元素总和
【发布时间】:2019-01-10 23:04:51
【问题描述】:

我一直在研究 aSspark 应用程序,并试图转换表 1 中所示的数据框。我想将列 (_2) 的每个元素除以由另一个元素分组的元素总和(同一列的)列 (_1)。表2是预期结果。

表 1

+---+---+
| _1| _2|
+---+---+
|  0| 13|
|  0|  7|
|  0|  3|
|  0|  1|
|  0|  1|
|  1|  4|
|  1|  8|
|  1| 18|
|  1|  4|
+---+---+

表 2

+---+----+
| _1| _2 |
+---+----+
|  0|13/x|
|  0| 7/x|
|  0| 3/x|
|  0| 1/x|
|  0| 1/x|
|  1| 4/y|
|  1| 8/y|
|  1|18/y|
|  1| 4/y|
+---+----+

其中,x= (13+7+3+1+1) 和 y = (4+8+18+4)

然后,我想计算 _1 列中每个元素的熵: 即,对于 _1 列中的每个元素,计算 _2 列中的 sum(p_i x log(p_i))。其中,p_i 基本上是 表 2 中 _1 列中每个值的 _2 列中的值。

最终的输出是。

+---+---------+
| _1| ENTROPY |
+---+---------+
|  0|entropy_1|
|  1|entropy_2|
+---+---------+

如何在 spark 中实现它(最好在 scala 中)?执行上述操作的优化方式是什么?我是 scala 的新手,任何相关建议都将受到高度赞赏。

谢谢。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    如果您想要一个简洁的解决方案并且组相当小,您可以使用窗口函数。首先你必须定义一个窗口:

    import org.apache.spark.sql.expressions.Window
    
    val w = Window.partitionBy("_1").rowsBetween(Long.MinValue, Long.MaxValue)
    

    概率:

    import org.apache.spark.sql.functions.sum
    
    val p = $"_2" / sum($"_2").over(w)
    val withP = df.withColumn("p", p)
    

    最后是熵:

    import org.apache.spark.sql.functions.log2
    
    withP.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
    

    对于示例数据

    val df = Seq(
      (0, 13), (0, 7), (0, 3), (0, 1), (0, 1), (1, 4), (1, 8), (1, 18), (1, 4)).toDF
    

    结果是:

    +---+------------------+
    | _1|           entropy|
    +---+------------------+
    |  1|1.7033848993102918|
    |  0|1.7433726580786888|
    +---+------------------+
    

    如果窗口函数在性能方面不可接受,您可以尝试aggregation-join-aggregation:

    df.groupBy($"_1").agg(sum("_2").alias("total"))
      .join(df, Seq("_1"), "inner")
      .withColumn("p", $"_2" / $"total")
      .groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
    

    地点:

    df.groupBy($"_1").agg(sum("_2").alias("total"))
    

    通过_1计算_2的总和,

    _.join(df, Seq("_1"), "inner")
    

    将聚合列添加到原始数据中,

    _.withColumn("p", $"_2" / $"total")
    

    计算概率,并且:

    _.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
    

    聚合得到熵。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-08-31
      • 2012-07-11
      • 1970-01-01
      • 2018-08-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-06-22
      相关资源
      最近更新 更多