【问题标题】:Implement a MergeSort like feature in spark with scala使用 scala 在 spark 中实现类似 MergeSort 的功能
【发布时间】:2016-05-10 23:58:31
【问题描述】:

Spark 版本 1.2.1 Scala 版本 2.10.4

我有 2 个由数字字段关联的 SchemaRDD:

RDD 1: (Big table - about a million records)
[A,3]
[B,4]
[C,5] 
[D,7] 
[E,8] 

RDD 2: (Small table < 100 records so using it as a Broadcast Variable)
[SUM, 2]
[WIN, 6]
[MOM, 7] 
[DOM, 9] 
[POM, 10] 

Result     
[C,5, WIN] 
[D,7, MOM] 
[E,8, DOM]
[E,8, POM]

我想要 RDD1 中的 max(field),它是

我正在尝试通过以下方式使用 Merge 来解决此问题:

  1. 按键对RDD进行排序(在一个组内排序不会超过100条记录在该组内。在上面的例子中是在一个组内)

  2. 执行类似于合并排序的合并操作。在这里,我还需要跟踪先前的值以找到最大值;我仍然只遍历列表一次。

由于这里有太多变量,我得到“任务不可序列化”异常。这种实现方法是否正确?我试图在这里避免笛卡尔积。有没有更好的方法?

添加代码 -

rdd1.groupBy(itm => (itm(2), itm(3))).mapValues( itmorg => {
  val miorec = itmorg.toList.sortBy(_(1).toString)
  for( r <- 0 to miorec.length) {
    for ( q <- 0 to rdd2.value.length) {
      if ( (miorec(r)(1).toString > rdd2.value(q).toString && miorec(r-1)(1).toString <= rdd2.value(q).toString && r > 0) ||  r == miorec.length)
           org.apache.spark.sql.Row(miorec(r-1)(0),miorec(r-1)(1),miorec(r-1)(2),miorec(r-1)(3),rdd2.value(q))
      }
    }
  }).collect.foreach(println)

【问题讨论】:

  • Since there are too may variables here I am getting "Task not serializable" exception. 似乎该异常可能是由于变量过多的其他原因。可以发一下代码吗?

标签: scala apache-spark mergesort serializable


【解决方案1】:

我不会进行全局排序。对于您的需要,这是一项昂贵的操作。找到最大值肯定比获得所有值的全局排序便宜。相反,请这样做:

  1. 对于每个分区,构建一个结构,为 RDD2 上的每一行保持 RDD1 上的最大值。这可以使用mapPartitions 和普通的 scala 数据结构轻松完成。您甚至可以在此处使用一次性合并代码。你应该得到类似HashMap(WIN -&gt; (C, 5), MOM -&gt; (D, 7), ...) 的东西
  2. 在每个执行器本地完成此操作后,使用 reduce 合并这些生成的数据结构应该很简单。

这里的目标是尽量不改组,将最复杂的操作保持在本地,因为您想要的结果大小非常小(在代码中使用 RDD1 和 RDD2 创建所有有效的键/值会更容易) aggregateByKey,但效率较低)。

至于您的异常,您需要显示代码,“任务不可序列化”通常意味着您正在传递不可序列化的闭包;-)

【讨论】:

  • 嗨丹尼尔,我已经添加了代码,你能给我一个代码片段和示例如何使用分区解决这个问题吗?
  • 遗憾的是,我现在没有时间为您构建代码,但它是easy to find examples。另外,请注意groupBy 是一个非常慢的操作,因为它会随机播放所有内容!瞄准本地reduceaggregateByKey。构建一个普通的 scala def,它会找到你寻找的结果,然后跨分区聚合。
猜你喜欢
  • 2017-07-05
  • 2019-02-28
  • 2014-03-29
  • 1970-01-01
  • 2018-02-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-11-20
相关资源
最近更新 更多