【问题标题】:How to Find the maximum consecutive years for each ID using Scala / Spark如何使用 Scala / Spark 查找每个 ID 的最大连续年份
【发布时间】:2016-08-15 03:54:54
【问题描述】:

我对每一行都有特定的 ID 和相应的运营年份:

例子:

ID   YEAR

A1   1999
A2   2000
A1   2000
B1   1998
A1   2002

现在,我需要确定每个 ID 的连续年数 结果,

A1 : 2  because[1999, 2000 ] 

等等,

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    如果您不想打扰 Spark SQL(在我看来,这对任务来说太过分了),您可以简单地使用 groupByKey (虽然每个 id 可能的年数是合理的)

    val rdd = sc.parallelize(Seq(
      ("A1", 1999),
      ("A2", 2000),
      ("A1", 2000),
      ("A1", 1998),
      ("A1", 2002),
      ("B1", 1998)
    ))
    
    def findMaxRange(l: Iterable[Int]) = {
      val ranges = mutable.ArrayBuffer[Int](1)
      l.toSeq.sorted.distinct.sliding(2).foreach { case y1 :: tail =>
        if (tail.nonEmpty) {
          val y2 = tail.head
          if (y2 - y1 == 1)  ranges(ranges.size - 1) +=  1
          else ranges += 1
        }
      }
      ranges.max
    }
    
    rdd1.groupByKey.map(r => (r._1, findMaxRange(r._2))).collect()
    
    res7: Array[(String, Int)] = Array((A1,3), (A2,1), (B1,1))
    

    【讨论】:

      【解决方案2】:

      如果您想要Spark 解决方案,我会选择DataFrame。它变得混乱,但这是一个有趣的问题:

      val testDf = Seq(
        ("A1", 1999),
        ("A2", 2000),
        ("A1", 2000),
        ("A1", 1998),
        ("A1", 2002),
        ("B1", 1998)
      ).toDF("ID", "YEAR")
      

      然后我会执行一个自连接(实际上是两个中的第一个):

      val selfJoined = testDf.orderBy($"YEAR").join(
        testDf.orderBy($"YEAR").toDF("R_ID", "R_YEAR"),
        $"R_ID" === $"ID" && $"YEAR" === ($"R_YEAR" - 1),
        "full_outer"
      ).filter($"ID".isNull || $"R_ID".isNull)
      
      selfJoined.show
      +----+----+----+------+
      |  ID|YEAR|R_ID|R_YEAR|
      +----+----+----+------+
      |null|null|  A2|  2000|
      |  A2|2000|null|  null|
      |null|null|  B1|  1998|
      |  B1|1998|null|  null|
      |null|null|  A1|  1998|
      |  A1|2000|null|  null|
      |null|null|  A1|  2002|
      |  A1|2002|null|  null|
      +----+----+----+------+
      

      从上面可以看出,我们现在有了连续年份的开始和结束日期。 R_YEAR,如果不是null,则包含连续年份“运行”的开始。下一行,YEAR 是这几年的结束。如果我更擅长Window 功能,我可能会使用lag 将记录拼接在一起,但我不会,所以我不会。我会再做一次自加入,然后是groupBy,然后是select 中的一些数学运算,然后是另一个groupBy

      selfJoined.filter($"ID".isNull).as("a").join(
        selfJoined.filter($"R_ID".isNull).as("b"),
        $"a.R_ID" === $"b.ID" && $"a.R_YEAR" <= $"b.YEAR"
      ).groupBy($"a.R_ID", $"a.R_YEAR").agg(min($"b.YEAR") as "last_YEAR")
       .select($"R_ID" as "ID", $"last_YEAR" - $"R_YEAR" + 1 as "inarow")
       .groupBy($"ID").agg(max($"inarow") as "MAX").show
      +---+---+
      | ID|MAX|
      +---+---+
      | B1|  1|
      | A1|  3|
      | A2|  1|
      +---+---+
      

      哇!

      【讨论】:

      • 感谢您的时间和详细的步骤说明,非常感谢您的解决方案!。但是,我想要的输出应该是 A1 -> 2,因为 A1 有 1999、2000、2002(这里 2002 不是连续年份。)
      • 看我的数据,不一样的。我为 A1 又增加了一年,以获得更好的案例。
      • David Griffin 能否请您解释一下逻辑部分、多个自连接和其背后的算术。累了我尽力去掌握它看起来很乏味,想确认我的解释是否正确。
      【解决方案3】:

      我会尝试以下方式:

      scala> case class DataRow(id: String, year: Int)
      defined class DataRow
      scala> val data = Seq(
                 DataRow("A1", 1999),
                 DataRow("A2", 2000),
                 DataRow("A1", 2000),
                 DataRow("B1", 1998),
                 DataRow("A1", 2002)
               )
      data: Seq[DataRow] = List(DataRow("A1", 1999), DataRow("A2", 2000), DataRow("A1", 2000), DataRow("B1", 1998), DataRow("A1", 2002))
      scala> data.groupBy(_.id).mapValues { rows =>
                 val years = rows.map(_.year)
                 val firstYear = years.head
                 years.zipWithIndex.takeWhile { case (y, i) => y == firstYear + i }.size
               }
      res1: Map[String, Int] = Map("B1" -> 1, "A2" -> 1, "A1" -> 2)
      

      这会计算每个 ID 的最大连续年数,假设它看到的第一年是罢工的最早日期。在val years 行中插入.sorted 不是这种情况。

      【讨论】:

      • 那不使用Spark
      猜你喜欢
      • 1970-01-01
      • 2021-01-19
      • 2020-12-19
      • 2018-03-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多