【问题标题】:reduceByKey using Scala object as keyreduceByKey 使用 Scala 对象作为键
【发布时间】:2015-03-26 14:15:58
【问题描述】:

我将 spark 与 scala 一起使用,并且我有一个充满 tuple2 的 RDD,其中包含一个复杂对象作为键和一个双精度对象。目的是在对象相同的情况下添加双倍(频率)。

为此,我将我的对象定义如下:

    case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{
      def compare(that: SimpleCoocurrence) = {
        if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos)
           &&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos))
          0
        else
          this.toString.compareTo(that.toString)
      }
    }

现在我正在尝试像这样使用 reduceBykey:

val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
coocRDD.map(tup=>tup).reduceByKey(_+_)
println(coocRDD.count)

但是,结果表明处理reducebykey前后的RDD包含完全相同数量的元素。

如何使用 tuple2[SimpleCoocurrence,Double] 执行 reduceByKey? 实现 Ordered 特征是告诉 Spark 如何比较我的对象的好方法吗? 我应该只使用 tuple2[String,Double] 吗?

谢谢,

【问题讨论】:

    标签: scala apache-spark reduce


    【解决方案1】:

    reduceByKey 不使用 Ordering 而是使用 hashCodeequals 来确定哪些键是相同的。特别是,hashPartitioner 将按哈希对键进行分组,以便具有相同 hashCode 的键落在同一个分区上,从而可以在每个分区上进行进一步的缩减。

    case 类具有equalshashCode 的默认实现。可能使用的测试数据具有字段distance:Double 的不同值,使每个实例成为唯一的对象。使用它作为键将导致只有相同的对象被归约为一个。

    解决此问题的一种方法是为您的case class 定义一个键,并为该对象定义一个添加方法,如下所示:

    case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable {
       val key = word + word_pos + cooc + cooc_pos
    }
    object SimpleCoocurrence {
       val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ???
    }
    
    val coocList:List[SimpleCoocurrence] = ???
    val coocRDD = sc.parallelize(coocList)
    val coocByKey = coocRDD.keyBy(_.key)
    val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add)
    

    (*) 代码作为指导示例提供 - 未经编译或测试。

    【讨论】:

      【解决方案2】:

      首先,我很笨……

      接下来,如果有人遇到同样的问题并想在 Spark 上使用复杂的 scala 对象作为 reduceByKey 的 Key:

      Spark 知道如何比较两个对象,即使它们没有实现 Ordered。所以上面的代码实际上是函数式的。

      唯一的问题是……我之前和之后都打印了相同的 RDD。当我写这个时,它实际上工作得很好。

      val coocRDD = sc.parallelize(coocList)
      println(coocRDD.count)
      val newRDD = coocRDD.map(tup=>tup).reduceByKey(_+_)
      println(newRDD.count)
      

      【讨论】:

        【解决方案3】:

        您没有存储 reduceByKey 的结果。试试这个:

        val coocRDD = sc.parallelize(coocList)
        println(coocRDD.count)
        val result = coocRDD.map(tup=>tup).reduceByKey(_+_)
        println(result.count)
        

        【讨论】:

          猜你喜欢
          • 2017-02-05
          • 1970-01-01
          • 2013-09-11
          • 2015-05-13
          • 2023-03-11
          • 2017-10-13
          • 2023-03-23
          • 2022-01-25
          • 2015-10-02
          相关资源
          最近更新 更多