【发布时间】:2017-02-28 10:21:25
【问题描述】:
...
val cols: Seq[String] = Seq("item", "SR", "RP")
val vecToSeq = udf((v:org.apache.spark.ml.linalg.Vector) => v.toArray)
val exprs = cols.zipWithIndex.map{ case(c,i) => $"_tmp".getItem(i).alias(c)}
val DoubleDF = result5.select(vecToSeq($"vectorCol").alias("_tmp")).select(exprs:_*)
...(对不起。我已经包含了所有我认为相关的代码,因为我不知道我应该提供多少信息。)
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.mllib.recommendation.{Rating,ALS}
...
val als = new org.apache.spark.ml.recommendation.ALS().setRank(10).setMaxIter(10).setRegParam(0.01).setUserCol("user").setItemCol("item").setRatingCol("rating")
...
val itemFactors = model.itemFactors
val item = Popular.select($"item").map(line => line.getDouble(0)).take(10).map(_.toInt)
val popularFactored = itemFactors.where(item.map($"id" === _).reduce(_||_))
val exprs2 = (0 until 10).map(i => $"_tmp2".getItem(i).alias(s"z$i"))
val factored = popularFactored.select(($"features").alias("_tmp2")).select(exprs2:_*)
val AF = (0 until 10).map(i => factored.agg(avg(s"z$i")).first.getDouble(0)).toArray
val toFloat = udf((line: Seq[Float]) => line.map(_.toDouble))
val test = itemFactors.withColumn("testX",toFloat(itemFactors("features")))
val itemFactors2 = test.select($"id",$"testX")
val itemFeatures2 = itemFactors2.map{line => val feature = line.getAs[Seq[Double]]("testX")
val item = line.getAs[Int]("id")
(item,feature.toArray)}
val itemFeaturesR = itemFeatures2.rdd
val ItemFS= itemFeaturesR.map { case (id,factor) =>
val arr= new DoubleMatrix(10)
for (i <- 0 until 10){
val itemVector = new DoubleMatrix(AF)
val factorVector = factor(i)
arr.put(i,factorVector)
val sims = cosineSimilarity(arr.getRow(i), itemVector)
(id,sims)
}
}
我现在正在测量余弦相似度。但是,当我运行上面的代码时,我得到一个错误 'org.apache.spark.SparkException: Task not serializable
`org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2056)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
... 59 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: _tmp[0] AS `item`)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@76a47056)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(_tmp[0] AS `item`, _tmp[1] AS `SR`, _tmp[2] AS``RP
'
我该如何解决?
【问题讨论】:
-
您可以添加更多代码吗?可能有几个问题。
-
@AndreiT。代码已更新。请确认一次。
-
尝试用
@transient标记exprs变量。 -
@AndreiT。对不起。我是初学者,我没有完全理解。具体是怎么改的?
-
在第三行(第一个示例),只需输入
@transient val exprs = ...而不是val exprs = ...
标签: scala apache-spark serialization apache-spark-mllib