【发布时间】:2015-05-17 22:45:06
【问题描述】:
这是我的一个 spark 工作的一个小问题,它似乎不会引起任何问题 - 但每次我看到它时都会让我烦恼并且无法提出更好的解决方案。
假设我有一个这样的 Scala 集合:
val myStuff = List(Try(2/2), Try(2/0))
我可以用分区把这个列表分成成功和失败:
val (successes, failures) = myStuff.partition(_.isSuccess)
这很好。分区的实现只遍历源集合一次,构建两个新集合。但是,使用 Spark,我能够设计出的最佳等价物是这样的:
val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }
除了解包 Try(这很好)的区别之外,还需要遍历数据两次。这很烦人。
有没有更好的 Spark 替代方案可以在不进行多次遍历的情况下拆分 RDD?即具有类似这样的签名,其中分区具有 Scala 集合分区而不是 RDD 分区的行为:
val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)
作为参考,我之前使用过类似下面的方法来解决这个问题。可能失败的操作是从二进制格式中反序列化一些数据,并且这些失败已经变得足够有趣,以至于它们需要被处理并保存为 RDD 而不是记录的内容。
def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
try {
Some(deserialize(data))
} catch {
case e: MyDesrializationError => {
logger.error(e)
None
}
}
}
【问题讨论】:
标签: scala apache-spark scala-collections