【问题标题】:Spark RDD equivalent to Scala collections partitionSpark RDD 相当于 Scala 集合分区
【发布时间】:2015-05-17 22:45:06
【问题描述】:

这是我的一个 spark 工作的一个小问题,它似乎不会引起任何问题 - 但每次我看到它时都会让我烦恼并且无法提出更好的解决方案。

假设我有一个这样的 Scala 集合:

val myStuff = List(Try(2/2), Try(2/0))

我可以用分区把这个列表分成成功和失败:

val (successes, failures) =  myStuff.partition(_.isSuccess)

这很好。分区的实现只遍历源集合一次,构建两个新集合。但是,使用 Spark,我能够设计出的最佳等价物是这样的:

val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }

除了解包 Try(这很好)的区别之外,还需要遍历数据两次。这很烦人。

有没有更好的 Spark 替代方案可以在不进行多次遍历的情况下拆分 RDD?即具有类似这样的签名,其中分区具有 Scala 集合分区而不是 RDD 分区的行为:

val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)

作为参考,我之前使用过类似下面的方法来解决这个问题。可能失败的操作是从二进制格式中反序列化一些数据,并且这些失败已经变得​​足够有趣,以至于它们需要被处理并保存为 RDD 而不是记录的内容。

def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
   try {
      Some(deserialize(data))
   } catch {
      case e: MyDesrializationError => {
         logger.error(e)
         None
      }
   }
}

【问题讨论】:

    标签: scala apache-spark scala-collections


    【解决方案1】:

    可能还有其他解决方案,但你去吧:

    设置:

    import scala.util._
    val myStuff = List(Try(2/2), Try(2/0))
    val myStuffInSpark = sc.parallelize(myStuff)
    

    执行:

    val myStuffInSparkPartitioned = myStuffInSpark.aggregate((List[Try[Int]](),List[Try[Int]]()))(
      (accum, curr)=>if(curr.isSuccess) (curr :: accum._1,accum._2) else (accum._1, curr :: accum._2), 
      (first, second)=> (first._1 ++ second._1,first._2 ++ second._2))
    

    如果您需要解释,请告诉我

    【讨论】:

    • 确实大致相当于 List().partition;但这仅适用于小型数据集,因为它涉及将所有内容加载到列表而不是 RDD!
    • @jdeastwood 在我继续尝试其他方法之前,您能否澄清一下您的问题。这符合您的需求和示例。您要寻找的最终签名是什么。
    • 已更新以尝试使其更清晰。您在此处展示的内容很接近,但改变了集合的类型。
    • 这是不可能的:groups.google.com/forum/#!topic/spark-users/rkVPXAiCiBkJosh Rosen 的链接很好,尤其是这个groups.google.com/forum/#!msg/spark-users/KC1UJEmUeg8/…,因为它是由 Matei 直接回答的。最好的办法是检查主 RDD 是否成功。您可以使用 mapPartitions 至少获取每个节点的所有数据。
    • 嵌套 RDD 并不是正确的思考方式(我希望拆分 RDD,而不是创建 RDD 的 RDD)。但尽管如此,我认为你是对的,这是不可能的,至少没有对 RDD 的自定义扩展。谢谢!
    猜你喜欢
    • 2019-06-16
    • 2015-01-29
    • 1970-01-01
    • 1970-01-01
    • 2015-10-31
    • 2019-05-21
    • 1970-01-01
    • 2015-07-05
    • 1970-01-01
    相关资源
    最近更新 更多