【问题标题】:Map can not be serializable in scala?地图不能在scala中序列化?
【发布时间】:2015-12-30 07:00:04
【问题描述】:

我是 Scala 新手。为什么“地图”功能不可序列化?如何使其可序列化?例如,如果我的代码如下:

val data = sc.parallelize(List(1,4,3,5,2,3,5))

def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
  val lst = List(("a", 1),("b", 2),("c",3), ("a",2))
  var res = List[Int]()
  while (iter.hasNext) {
    val cur = iter.next
    val a = lst.groupBy(x => x._1).mapValues(_.size)
    //val b= a.map(x => x._2)
    res = res ::: List(cur)
  }
  res.iterator
}

data.mapPartitions(myfunc).collect

如果我取消注释该行

val b= a.map(x => x._2)

代码返回异常:

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2
Serialization stack:
    - object not serializable (class: scala.collection.immutable.MapLike$$anon$2, value: Map(1 -> 3))
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: a, type: interface scala.collection.immutable.Map)

非常感谢。

【问题讨论】:

  • 据我所知,它在 Spark 1.2.0 - 1.5.0 上无法重现。可以提供一些配置细节吗?你如何执行这段代码?
  • 嗨 zero323,我直接在 Spark 1.5 附带的 Scala shell 中运行了这段代码。我还在 Spark 1.0.1 的 Scala Shell 中运行了代码,也存在同样的问题。
  • 我怀疑这不是给出错误的实际代码吗?您的lst 真的只是实际代码中的一个简单列表吗?还是另一个 RDD?
  • 嗨,保罗,这是我用来演示我的问题的代码的简化版本(我对简化代码也有同样的问题)。在我的确切代码中,参数“iter”是一个元组,并且 lst = iter._2.
  • @Carter 所以为了清楚起见-您实际上可以使用这段代码重现问题吗?不从函数返回ab

标签: scala serialization apache-spark


【解决方案1】:

众所周知的 scala 错误:https://issues.scala-lang.org/browse/SI-7005 Map#mapValues is not serializable

我们的 Spark 应用程序中有这个问题,map(identity) 解决了这个问题

rdd.groupBy(_.segment).mapValues(v => ...).map(identity)

【讨论】:

【解决方案2】:

下面提供了mapValues 函数的实际实现,正如您所见,它不可序列化并且仅创建一个视图,而不是正确存在的数据,因此您会收到此错误。情境方面的mapValues 可以有很多优势。

protected class MappedValues[C](f: B => C) extends AbstractMap[A, C] with DefaultMap[A, C] {
    override def foreach[D](g: ((A, C)) => D): Unit = for ((k, v) <- self) g((k, f(v)))
    def iterator = for ((k, v) <- self.iterator) yield (k, f(v))
    override def size = self.size
    override def contains(key: A) = self.contains(key)
    def get(key: A) = self.get(key).map(f)
}

【讨论】:

    【解决方案3】:

    您是否尝试过在应用程序中运行相同的代码?我怀疑这是火花外壳的问题。如果您想让它在 spark shell 中工作,那么您可以尝试将 myfunc 的定义及其应用程序包装在花括号中,如下所示:

    val data = sc.parallelize(List(1,4,3,5,2,3,5))
    
    val result = { 
      def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
        val lst = List(("a", 1),("b", 2),("c",3), ("a",2))
        var res = List[Int]()
        while (iter.hasNext) {
          val cur = iter.next
          val a = lst.groupBy(x => x._1).mapValues(_.size)
          val b= a.map(x => x._2)
          res = res ::: List(cur)
        }
        res.iterator
      }
      data.mapPartitions(myfunc).collect
    }
    

    【讨论】:

    • 我只在 Spark Shell 中尝试过这段代码。看起来这是 Shell 的问题。
    猜你喜欢
    • 2018-11-27
    • 1970-01-01
    • 2016-10-14
    • 2020-12-01
    • 1970-01-01
    • 2023-04-02
    • 1970-01-01
    • 2018-09-19
    • 1970-01-01
    相关资源
    最近更新 更多