【发布时间】:2016-09-01 13:43:42
【问题描述】:
我想对从 mongodb 获得的数据运行 Spark RowSimilarity 推荐器。为此,我在下面编写了从 mongo 获取输入的代码,将其转换为对象的 RDD。这需要传递给 IndexedDataSetSpark,然后再传递给 SimilarityAnalysis.rowSimilarityIDS
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat
object SparkExample extends App {
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://my_mongo_ip:27017/db.collection")
val sparkConf = new SparkConf()
val sc = new SparkContext("local", "SparkExample", sparkConf)
val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(
mongoConfig,
classOf[MongoInputFormat],
classOf[Object],
classOf[BSONObject]
)
val new_doc: RDD[(String, String)] = documents.map(
doc1 => (
doc1._2.get("product_id").toString(),
doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-")).mkString(" ")
)
)
var myIDs = IndexedDatasetSpark(new_doc)(sc)
SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile", readWriteSchema)
我无法创建可以传递给 SimilarityAnalysis.rowSimilarityIDS 的 IndexedDatasetSpark。请在这件事上帮助我。
编辑1:
我设法创建了 IndexedDatasetSpark 对象,并且代码现在可以正确编译。我必须将(sc) 作为隐式参数添加到IndexedDatasetSpark 才能运行代码:
Error: could not find implicit value for parameter sc: org.apache.spark.SparkContext
现在,当我运行它时,它会给出以下错误:
Error: could not find implicit value for parameter sc: org.apache.mahout.math.drm.DistributedContext
我不知道如何提供 DistributedContext。
这是创建 RDD 并将其转换为 IDS 以便它可以由 rowSimilarityIDS 处理的正确方法吗?
更多上下文:我是从这种情况开始的:Run Mahout RowSimilarity recommender on MongoDB data
我的 build.sbt:
name := "scala-mongo"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
"org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
)
libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2"
resolvers += "typesafe repo" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += Resolver.mavenLocal
Edit2:我暂时删除了 dfsWrite 以让代码执行并偶然发现以下错误:
java.io.NotSerializableException: org.apache.mahout.math.DenseVector
Serialization stack:
- object not serializable (class: org.apache.mahout.math.DenseVector, value: {3:1.0,8:1.0,10:1.0})
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some({3:1.0,8:1.0,10:1.0}))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
是否有一些我可能跳过的序列化?
【问题讨论】:
-
您忘记显示错误了吗?
-
@pferrel:我已经编辑了最后一个错误的问题。如果我在 Scala/Spark/Mahout 中遵循正确的操作程序,请告诉我。
-
@pferrel:删除 dfsWrite 并让 rowSimilarity 运行后,我遇到了一个新问题。已更新问题。
标签: mongodb scala apache-spark mahout mahout-recommender