【问题标题】:How to GroupBy Spark DataFrame with my Equality Comparators?如何使用我的平等比较器 GroupBy Spark DataFrame?
【发布时间】:2019-03-13 16:40:11
【问题描述】:

我想在带有我自己的相等比较器的 DataFrame 上使用 GroupBy 运算符。

假设我想执行如下操作:

df.groupBy("Year","Month").sum("Counter")

在这个数据帧中:

Year    | Month      | Counter  
---------------------------
2012    | Jan        | 100          
12      | January    | 200       
12      | Janu       | 300       
2012    | Feb        | 400       
13      | Febr       | 500

我必须实现两个比较器:

1) 对于列年份:p.e. “2012”==“12”

2) 对于月份列:p.e. "一月" == "一月" == "一月"

假设我已经实现了这两个比较器。我怎样才能调用它们?与this 示例一样,我已经知道我必须将我的 DataFrame 转换为 RDD 才能使用我的比较器。

我考虑过使用RDD GroupBy

请注意,我确实需要使用比较器来做到这一点。我无法使用 UDF、更改数据或创建新列。未来的想法是拥有密文列,其中我有允许我比较两个密文是否相同的功能。我想在我的比较器中使用它们。

编辑:

此刻,我试图只用一列来做到这一点,例如:

df.groupBy("Year").sum("Counter")

我有一个 Wrapper 类:

class ExampleWrapperYear (val year: Any) extends Serializable {
      // override hashCode and Equals methods
}

那么,我正在这样做:

val rdd = df.rdd.keyBy(a => new ExampleWrapperYear(a(0))).groupByKey()

我的问题是如何进行“求和”,以及如何将 keyBy 与多列一起使用以使用 ExampleWrapperYear 和 ExampleWrapperMonth。

【问题讨论】:

  • 为什么要使用比较器??
  • 我的目标是了解我是否可以使用比较器来做到这一点。正如我在问题中所说,我未来的目标是使用密文,我不能使用 UDF 来解密服务器端的数据。想象一下,我有一个包含 3 个条目的“年龄”列:20、25、20。在使用非确定性密码加密数据后,我得到一个包含 XXX、YYY、ZZZ 的列(总是不同的值)。我有一个函数告诉我 XXX 和 ZZZ 是相同的(只是一个例子)。所以我想调用一个比较器来调用我的函数,让我知道它们是相同的。
  • 所以我理解正确,比较器告诉您它们相等,但您无法将加密值映射到它们相等的空间?
  • 是的,这是我的挑战。我发现这个example 可能与我的提议非常相似。但我不明白如何在我的情况下应用它。例如,在我上面提出的查询中。
  • 为了简单起见,如果你只有一个告诉你键是否相等的函数,你唯一的解决方案是叉积并进行每一对比较。比这更好的唯一方法是让您告诉我们更多有关您的具体用例的信息,以便我们找到降低复杂性的技巧。

标签: scala sorting apache-spark apache-spark-sql


【解决方案1】:

这个解决方案应该可以工作。这里是实现 hashCode 和 equals 的案例类(我们可以称之为比较器)。

你可以根据不同的密文修改/更新 hashCode 和 equals

  case class Year(var year:Int){

    override def hashCode(): Int = {
      this.year = this.year match {
        case 2012 => 2012
        case 12 => 2012
        case 13 => 2013
        case _ => this.year
      }
      this.year.hashCode()
    }

    override def equals(that: Any): Boolean ={
      val year1 = 2000 + that.asInstanceOf[Year].year % 100
      val year2 = 2000 + this.year % 100
      if (year1 == year2)
        true
      else
        false
    }
  }

  case class Month(var month:String){

    override def hashCode(): Int = {
      this.month = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      this.month.hashCode
    }

    override def equals(that: Any): Boolean ={
      val month1 = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      val month2 = that.asInstanceOf[Month].month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => that.asInstanceOf[Month].month
      }
      if (month1.equals(month2))
        true
      else
        false
    }
  }

这里是分组键的重要比较器,它只是使用单独的 col 比较器

  case class Key(var year:Year, var month:Month){

    override def hashCode(): Int ={
      this.year.hashCode() + this.month.hashCode()
    }

    override def equals(that: Any): Boolean ={
      if ( this.year.equals(that.asInstanceOf[Key].year) && this.month.equals(that.asInstanceOf[Key].month))
        true
      else
        false
    }
  }

  case class Record(year:Int,month:String,counter:Int)

  val df = spark.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("data.csv").as[Record]

  df.rdd.groupBy[Key](
      (record:Record)=>Key(Year(record.year), Month(record.month)))
      .map(x=> Record(x._1.year.year, x._1.month.month, x._2.toList.map(_.counter).sum))
      .toDS().show()

给了

+----+-----+-------+
|year|month|counter|
+----+-----+-------+
|2012|  Feb|    800|
|2013|  Feb|    500|
|2012|  Jan|    700|
+----+-----+-------+

for this input in data.csv

Year,Month,Counter
2012,February,400
2012,Jan,100
12,January,200
12,Janu,300
2012,Feb,400
13,Febr,500
2012,Jan,100

请注意,对于 Year 和 Month 案例类,还将值更新为标准值(否则无法预测它会选择哪个值)。

【讨论】:

  • 这正是我想做的!谢谢
【解决方案2】:

可以使用udfs实现逻辑,使其成为标准的年月格式

  def toYear : (Integer) => Integer = (year:Integer)=>{
    2000 + year % 100 //assuming all years in 2000-2999 range
  }

  def toMonth : (String) => String = (month:String)=>{
    month match {
      case "January"=> "Jan"
      case "Janu"=> "Jan"
      case "February" => "Feb"
      case "Febr" => "Feb"
      case _ => month
    }
  }

  val toYearUdf = udf(toYear)
  val toMonthUdf = udf(toMonth)

  df.groupBy( toYearUdf(col("Year")), toMonthUdf(col("Month"))).sum("Counter").show()

【讨论】:

  • 可能你在我的最后一段中没有注意到。我真的需要使用比较器来做这个操作。未来的想法是有密文列,其中我有允许我比较两个密文是否相同的功能。我更新了最后一段,让其他用户明白为什么我不能使用 UDF。
猜你喜欢
  • 2018-10-03
  • 1970-01-01
  • 1970-01-01
  • 2019-08-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-03-28
  • 2018-04-04
相关资源
最近更新 更多