【问题标题】:org.apache.spark.SparkException: Task not serializable Errororg.apache.spark.SparkException:任务不可序列化错误
【发布时间】:2017-02-28 10:21:25
【问题描述】:

...

val cols: Seq[String] = Seq("item", "SR", "RP")
 val vecToSeq = udf((v:org.apache.spark.ml.linalg.Vector) => v.toArray)
val exprs = cols.zipWithIndex.map{ case(c,i) => $"_tmp".getItem(i).alias(c)}
val DoubleDF = result5.select(vecToSeq($"vectorCol").alias("_tmp")).select(exprs:_*)

...(对不起。我已经包含了所有我认为相关的代码,因为我不知道我应该提供多少信息。)

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.mllib.recommendation.{Rating,ALS}
...

val als = new org.apache.spark.ml.recommendation.ALS().setRank(10).setMaxIter(10).setRegParam(0.01).setUserCol("user").setItemCol("item").setRatingCol("rating")
...

val itemFactors = model.itemFactors
val item = Popular.select($"item").map(line => line.getDouble(0)).take(10).map(_.toInt)

val popularFactored = itemFactors.where(item.map($"id" === _).reduce(_||_)) 

val exprs2 = (0 until 10).map(i => $"_tmp2".getItem(i).alias(s"z$i"))
val factored = popularFactored.select(($"features").alias("_tmp2")).select(exprs2:_*)

val AF = (0 until 10).map(i => factored.agg(avg(s"z$i")).first.getDouble(0)).toArray


val toFloat = udf((line: Seq[Float]) => line.map(_.toDouble))
val test = itemFactors.withColumn("testX",toFloat(itemFactors("features")))
val itemFactors2 = test.select($"id",$"testX")


val itemFeatures2 = itemFactors2.map{line => val feature = line.getAs[Seq[Double]]("testX")
val item = line.getAs[Int]("id")
(item,feature.toArray)}

val itemFeaturesR = itemFeatures2.rdd


val ItemFS= itemFeaturesR.map { case (id,factor) =>
val arr= new DoubleMatrix(10)
for (i <- 0 until 10){
val itemVector = new DoubleMatrix(AF)
val factorVector = factor(i)
arr.put(i,factorVector)
val sims = cosineSimilarity(arr.getRow(i), itemVector)
(id,sims)
}
}

我现在正在测量余弦相似度。但是,当我运行上面的代码时,我得到一个错误 'org.apache.spark.SparkException: Task not serializable

`org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2056)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.map(RDD.scala:365)
  ... 59 elided
    Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
        - object not serializable (class: org.apache.spark.sql.Column, value: _tmp[0] AS `item`)
        - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@76a47056)
        - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)


- object (class scala.collection.immutable.$colon$colon, List(_tmp[0] AS `item`, _tmp[1] AS `SR`, _tmp[2] AS``RP 
   '

我该如何解决?

【问题讨论】:

  • 您可以添加更多代码吗?可能有几个问题。
  • @AndreiT。代码已更新。请确认一次。
  • 尝试用@transient 标记exprs 变量。
  • @AndreiT。对不起。我是初学者,我没有完全理解。具体是怎么改的?
  • 在第三行(第一个示例),只需输入@transient val exprs = ... 而不是val exprs = ...

标签: scala apache-spark serialization apache-spark-mllib


【解决方案1】:

尝试把类定义with Serializable

【讨论】:

    【解决方案2】:

    'org.apache.spark.SparkException: Task not serializable' 很明显,您的类无法序列化。由于 Spark 是并行计算,它需要跨集群共享代码(闭包)。为此,它需要您序列化您的代码。使用 Serializable 扩展您的类定义。

    【讨论】:

    • 对不起。我是初学者,我没有完全理解。具体是怎么改的?
    猜你喜欢
    • 2015-05-31
    • 2016-07-27
    • 2015-08-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-02-07
    • 1970-01-01
    相关资源
    最近更新 更多