【问题标题】:Spark, SQL aggregation based on a second data setSpark,基于第二个数据集的 SQL 聚合
【发布时间】:2019-03-14 22:50:07
【问题描述】:

我有两个数据集(数据框)

  1. idPeersDS - 有一个 id 列,它是对等方的 id。
  2. infoDS - 它有两个类型列(type1、type2)和一个指标列。

--

idPeersDS
+---+---------+
| id|    peers|
+---+---------+
|  1|[1, 2, 3]|
|  2|[2, 1, 6]|
|  3|[3, 1, 2]|
|  4|[4, 5, 6]|
|  5|[5, 4, 6]|
|  6|[6, 1, 2]|
+---+---------+


infoDS
+---+-----+-----+------+
| id|type1|type2|metric|
+---+-----+-----+------+
|  1|    A|    X|  10.0|
|  1|    A|    Y|  20.0|
|  1|    B|    X|  30.0|
|  1|    B|    Y|  40.0|
|  2|    A|    Y|  10.0|
|  2|    B|    X|  20.0|
|  2|    B|    Y|  30.0|
|  3|    A|    X|  40.0|
|  4|    B|    Y|  10.0|
|  5|    A|    X|  20.0|
|  5|    B|    X|  30.0|
|  6|    A|    Y|  40.0|
|  6|    B|    Y|  10.0|
+---+-----+-----+------+

我需要为按 type1 和 type2 分组的每个 id 计算指标的 zscore。但这不是分组数据的指标得分,而是组中对等点的指标的 zscore。如果 peerId 在组中没有 metric,则 peerId 的 metric 被视为 0。

示例: 对于组 ("A", "X") 和 id = 1,对等点是 (1,2,3),zscore 的指标将是 (10, 0, 40);因为 id = 2 在组 ("A","X") 中不存在,所以它是 0。id=5 不是 id=1 的对等体,因此它不是 zscore 计算的一部分。

+---+------+---------+-----------+
| id|metric|    peers|type1|type2|
+---+------+---------+-----------+
|  1|  10.0|[1, 2, 3]|    A|    X|
|  3|  40.0|[3, 1, 2]|    A|    X|
|  5|  20.0|[5, 4, 6]|    A|    X|
Z = (X - μ) / σ
Z = (10 - 16.66666) / 16.99673

Z = -0.39223

 Output should be the following table. I can compute zscore if `peersmetrics` column instead of `zScoreValue` column like my code did.
    +---+------+---------+-----------+-----+-----+
    | id|metric|    peers|zScoreValue|type1|type2|    peersmetrics
    +---+------+---------+-----------+-----+-----+
    |  1|  10.0|[1, 2, 3]|      -0.39|    A|    X|    [10, 0, 40]
    |  3|  40.0|[3, 1, 2]|       1.37|    A|    X|    [40, 10, 0]
    |  5|  20.0|[5, 4, 6]|       1.41|    A|    X|    [20, 0 , 0]
    |  1|  40.0|[1, 2, 3]|       0.98|    B|    Y|    [40, 30, 0]
    |  2|  30.0|[2, 1, 6]|       0.27|    B|    Y|    [30, 40, 10]
    |  4|  10.0|[4, 5, 6]|       0.71|    B|    Y|
    |  6|  10.0|[6, 1, 2]|      -1.34|    B|    Y|
    |  1|  30.0|[1, 2, 3]|       1.07|    B|    X|
    |  2|  20.0|[2, 1, 6]|       0.27|    B|    X|
    |  5|  30.0|[5, 4, 6]|       1.41|    B|    X|
    |  1|  20.0|[1, 2, 3]|       1.22|    A|    Y|
    |  2|  10.0|[2, 1, 6]|      -1.07|    A|    Y|
    |  6|  40.0|[6, 1, 2]|       1.34|    A|    Y|
    +---+------+---------+-----------+-----+-----+

Edit1:SQL 解决方案同样受到赞赏。我可以在我的 spark 作业中将 SQL 转换为 Scala 代码。

以下是我的解决方案,但计算时间比我希望的要长。 真实数据集的大小: idPeersDS 有 17000,infoDS 有 17000 * 6 * 15

非常感谢任何其他解决方案。随意编辑/推荐标题和更正语法。英语不是我的第一语言。谢谢。

这是我的代码。

val idPeersDS = Seq(
  (1, Seq(1,2,3)),
  (2, Seq(2,1,6)),
  (3, Seq(3,1,2)),
  (4, Seq(4,5,6)),
  (5, Seq(5,4,6)),
  (6, Seq(6,1,2))
).toDS.select($"_1" as "id", $"_2" as "peers")

val infoDS = Seq(
  (1, "A", "X", 10),
  (1, "A", "Y", 20),
  (1, "B", "X", 30),
  (1, "B", "Y", 40),
  (2, "A", "Y", 10),
  (2, "B", "X", 20),
  (2, "B", "Y", 30),
  (3, "A", "X", 40),
  (4, "B", "Y", 10),
  (5, "A", "X", 20),
  (5, "B", "X", 30),
  (6, "A", "Y", 40),
  (6, "B", "Y", 10)
).toDS.select($"_1" as "id", $"_2" as "type1", $"_3" as "type2", $"_4" cast "double" as "metric")




def calculateZScoreGivenPeers(idMetricDS: DataFrame, irPeersDS: DataFrame, roundTo: Int = 2)
(implicit spark: SparkSession): DataFrame = {

  import spark.implicits._

  // for every id in the idMetricDS, get the peers and their metric for zscore, calculate zscore
  val fir = idMetricDS.join(irPeersDS, "id")
  val fsMapBroadcast = spark.sparkContext.broadcast(
    idMetricDS.toDF.map((r: Row) => {r.getInt(0) -> r.getDouble(1)}).rdd.collectAsMap)
  val fsMap = fsMapBroadcast.value
  val funUdf = udf((currId: Int, xs: WrappedArray[Int]) => {
    val zScoreMetrics: Array[Double] = xs.toArray.map(x => fsMap.getOrElse(x, 0.0))
    val ds = new DescriptiveStatistics(zScoreMetrics)
    val mean = ds.getMean()
    val sd = Math.sqrt(ds.getPopulationVariance())
    val zScore = if (sd == 0.0) {0.0} else {(fsMap.getOrElse(currId, 0.0)- mean) / sd}
    zScore
  })

  val idStatsWithZscoreDS =
    fir.withColumn("zScoreValue", round(funUdf(fir("id"), fir("peers")), roundTo))
  fsMapBroadcast.unpersist
  fsMapBroadcast.destroy
  return idStatsWithZscoreDS

}

val typesComb = infoDS.select("type1", "type2").dropDuplicates.collect

val zScoreDS = typesComb.map(
  ept => {
    val et = ept.getString(0)
    val pt = ept.getString(1)
    val idMetricDS = infoDS.where($"type1" === lit(et) && $"type2" === lit(pt)).select($"id", $"metric")
    val zScoreDS = calculateZScoreGivenPeers(idMetricDS, idPeersDS)(spark)
    zScoreDS.select($"id", $"metric", $"peers", $"zScoreValue").withColumn("type1", lit(et)).withColumn("type2", lit(pt))
  }
).reduce(_.union(_))


scala> idPeersDS.show(100)
+---+---------+
| id|    peers|
+---+---------+
|  1|[1, 2, 3]|
|  2|[2, 1, 6]|
|  3|[3, 1, 2]|
|  4|[4, 5, 6]|
|  5|[5, 4, 6]|
|  6|[6, 1, 2]|
+---+---------+


scala> infoDS.show(100)
+---+-----+-----+------+
| id|type1|type2|metric|
+---+-----+-----+------+
|  1|    A|    X|  10.0|
|  1|    A|    Y|  20.0|
|  1|    B|    X|  30.0|
|  1|    B|    Y|  40.0|
|  2|    A|    Y|  10.0|
|  2|    B|    X|  20.0|
|  2|    B|    Y|  30.0|
|  3|    A|    X|  40.0|
|  4|    B|    Y|  10.0|
|  5|    A|    X|  20.0|
|  5|    B|    X|  30.0|
|  6|    A|    Y|  40.0|
|  6|    B|    Y|  10.0|
+---+-----+-----+------+


scala> typesComb
res3: Array[org.apache.spark.sql.Row] = Array([A,X], [B,Y], [B,X], [A,Y])

scala> zScoreDS.show(100)
+---+------+---------+-----------+-----+-----+
| id|metric|    peers|zScoreValue|type1|type2|
+---+------+---------+-----------+-----+-----+
|  1|  10.0|[1, 2, 3]|      -0.39|    A|    X|
|  3|  40.0|[3, 1, 2]|       1.37|    A|    X|
|  5|  20.0|[5, 4, 6]|       1.41|    A|    X|
|  1|  40.0|[1, 2, 3]|       0.98|    B|    Y|
|  2|  30.0|[2, 1, 6]|       0.27|    B|    Y|
|  4|  10.0|[4, 5, 6]|       0.71|    B|    Y|
|  6|  10.0|[6, 1, 2]|      -1.34|    B|    Y|
|  1|  30.0|[1, 2, 3]|       1.07|    B|    X|
|  2|  20.0|[2, 1, 6]|       0.27|    B|    X|
|  5|  30.0|[5, 4, 6]|       1.41|    B|    X|
|  1|  20.0|[1, 2, 3]|       1.22|    A|    Y|
|  2|  10.0|[2, 1, 6]|      -1.07|    A|    Y|
|  6|  40.0|[6, 1, 2]|       1.34|    A|    Y|
+---+------+---------+-----------+-----+-----+

【问题讨论】:

    标签: sql scala apache-spark


    【解决方案1】:

    我解决了。这是我的答案。该解决方案的运行速度确实比我之前在真实数据集问题中的解决方案快得多(

    val idPeersDS = Seq(
      (1, Seq(1,2,3)),
      (2, Seq(2,1,6)),
      (3, Seq(3,1,2)),
      (4, Seq(4,5,6)),
      (5, Seq(5,4,6)),
      (6, Seq(6,1,2))
    ).toDS.select($"_1" as "id", $"_2" as "peers")
    
    val infoDS = Seq(
      (1, "A", "X", 10),
      (1, "A", "Y", 20),
      (1, "B", "X", 30),
      (1, "B", "Y", 40),
      (2, "A", "Y", 10),
      (2, "B", "X", 20),
      (2, "B", "Y", 30),
      (3, "A", "X", 40),
      (4, "B", "Y", 10),
      (5, "A", "X", 20),
      (5, "B", "X", 30),
      (6, "A", "Y", 40),
      (6, "B", "Y", 10)
    ).toDS.select($"_1" as "id", $"_2" as "type1", $"_3" as "type2", $"_4" cast "double" as "metric")
    
    
    // Exiting paste mode, now interpreting.
    
    idPeersDS: org.apache.spark.sql.DataFrame = [id: int, peers: array<int>]
    infoDS: org.apache.spark.sql.DataFrame = [id: int, type1: string ... 2 more fields]
    
    scala> idPeersDS.show
    +---+---------+
    | id|    peers|
    +---+---------+
    |  1|[1, 2, 3]|
    |  2|[2, 1, 6]|
    |  3|[3, 1, 2]|
    |  4|[4, 5, 6]|
    |  5|[5, 4, 6]|
    |  6|[6, 1, 2]|
    +---+---------+
    
    
    scala> infoDS.show
    +---+-----+-----+------+
    | id|type1|type2|metric|
    +---+-----+-----+------+
    |  1|    A|    X|  10.0|
    |  1|    A|    Y|  20.0|
    |  1|    B|    X|  30.0|
    |  1|    B|    Y|  40.0|
    |  2|    A|    Y|  10.0|
    |  2|    B|    X|  20.0|
    |  2|    B|    Y|  30.0|
    |  3|    A|    X|  40.0|
    |  4|    B|    Y|  10.0|
    |  5|    A|    X|  20.0|
    |  5|    B|    X|  30.0|
    |  6|    A|    Y|  40.0|
    |  6|    B|    Y|  10.0|
    +---+-----+-----+------+
    
    
    scala> val infowithpeers = infoDS.join(idPeersDS, "id")
    infowithpeers: org.apache.spark.sql.DataFrame = [id: int, type1: string ... 3 more fields]
    
    scala> infowithpeers.show
    +---+-----+-----+------+---------+
    | id|type1|type2|metric|    peers|
    +---+-----+-----+------+---------+
    |  1|    A|    X|  10.0|[1, 2, 3]|
    |  1|    A|    Y|  20.0|[1, 2, 3]|
    |  1|    B|    X|  30.0|[1, 2, 3]|
    |  1|    B|    Y|  40.0|[1, 2, 3]|
    |  2|    A|    Y|  10.0|[2, 1, 6]|
    |  2|    B|    X|  20.0|[2, 1, 6]|
    |  2|    B|    Y|  30.0|[2, 1, 6]|
    |  3|    A|    X|  40.0|[3, 1, 2]|
    |  4|    B|    Y|  10.0|[4, 5, 6]|
    |  5|    A|    X|  20.0|[5, 4, 6]|
    |  5|    B|    X|  30.0|[5, 4, 6]|
    |  6|    A|    Y|  40.0|[6, 1, 2]|
    |  6|    B|    Y|  10.0|[6, 1, 2]|
    +---+-----+-----+------+---------+
    
    
    scala> val joinMap = udf { values: Seq[Map[Int,Double]] => values.flatten.toMap }
    joinMap: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(IntegerType,DoubleType,false),Some(List(ArrayType(MapType(IntegerType,DoubleType,false),true))))
    
    scala> val zScoreCal = udf { (metric: Double, zScoreMetrics: WrappedArray[Double]) =>
        |   val ds = new DescriptiveStatistics(zScoreMetrics.toArray)
        |   val mean = ds.getMean()
        |   val sd = Math.sqrt(ds.getPopulationVariance())
        |   val zScore = if (sd == 0.0) {0.0} else {(metric - mean) / sd}
        |   zScore
        | }
    zScoreCal: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,DoubleType,Some(List(DoubleType, ArrayType(DoubleType,false))))
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    
    val infowithpeersidmetric = infowithpeers.withColumn("idmetric", map($"id",$"metric"))
    val idsingrpdf = infowithpeersidmetric.groupBy("type1","type2").agg(joinMap(collect_list(map($"id", $"metric"))) as "idsingrp")
    
    val metricsMap = udf { (peers: Seq[Int], values: Map[Int,Double]) => {
        peers.map(p => values.getOrElse(p,0.0))
      }
    }
    
    // Exiting paste mode, now interpreting.
    
    infowithpeersidmetric: org.apache.spark.sql.DataFrame = [id: int, type1: string ... 4 more fields]
    idsingrpdf: org.apache.spark.sql.DataFrame = [type1: string, type2: string ... 1 more field]
    metricsMap: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(DoubleType,false),Some(List(ArrayType(IntegerType,false), MapType(IntegerType,DoubleType,false))))
    
    scala> val infoWithMap = infowithpeers.join(idsingrpdf, Seq("type1","type2")).withColumn("zScoreMetrics", metricsMap($"peers", $"idsingrp")).withColumn("zscore", round(zScoreCal($"metric",$"zScoreMetrics"),2))
    infoWithMap: org.apache.spark.sql.DataFrame = [type1: string, type2: string ... 6 more fields]
    
    scala> infoWithMap.show
    +-----+-----+---+------+---------+--------------------+------------------+------+
    |type1|type2| id|metric|    peers|            idsingrp|     zScoreMetrics|zscore|
    +-----+-----+---+------+---------+--------------------+------------------+------+
    |    A|    X|  1|  10.0|[1, 2, 3]|[3 -> 40.0, 5 -> ...| [10.0, 0.0, 40.0]| -0.39|
    |    A|    Y|  1|  20.0|[1, 2, 3]|[2 -> 10.0, 6 -> ...| [20.0, 10.0, 0.0]|  1.22|
    |    B|    X|  1|  30.0|[1, 2, 3]|[1 -> 30.0, 2 -> ...| [30.0, 20.0, 0.0]|  1.07|
    |    B|    Y|  1|  40.0|[1, 2, 3]|[4 -> 10.0, 1 -> ...| [40.0, 30.0, 0.0]|  0.98|
    |    A|    Y|  2|  10.0|[2, 1, 6]|[2 -> 10.0, 6 -> ...|[10.0, 20.0, 40.0]| -1.07|
    |    B|    X|  2|  20.0|[2, 1, 6]|[1 -> 30.0, 2 -> ...| [20.0, 30.0, 0.0]|  0.27|
    |    B|    Y|  2|  30.0|[2, 1, 6]|[4 -> 10.0, 1 -> ...|[30.0, 40.0, 10.0]|  0.27|
    |    A|    X|  3|  40.0|[3, 1, 2]|[3 -> 40.0, 5 -> ...| [40.0, 10.0, 0.0]|  1.37|
    |    B|    Y|  4|  10.0|[4, 5, 6]|[4 -> 10.0, 1 -> ...| [10.0, 0.0, 10.0]|  0.71|
    |    A|    X|  5|  20.0|[5, 4, 6]|[3 -> 40.0, 5 -> ...|  [20.0, 0.0, 0.0]|  1.41|
    |    B|    X|  5|  30.0|[5, 4, 6]|[1 -> 30.0, 2 -> ...|  [30.0, 0.0, 0.0]|  1.41|
    |    A|    Y|  6|  40.0|[6, 1, 2]|[2 -> 10.0, 6 -> ...|[40.0, 20.0, 10.0]|  1.34|
    |    B|    Y|  6|  10.0|[6, 1, 2]|[4 -> 10.0, 1 -> ...|[10.0, 40.0, 30.0]| -1.34|
    +-----+-----+---+------+---------+--------------------+------------------+------+
    

    【讨论】:

      猜你喜欢
      • 2021-09-26
      • 2018-01-11
      • 1970-01-01
      • 2018-09-30
      • 1970-01-01
      • 1970-01-01
      • 2021-08-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多