【发布时间】:2016-12-30 20:02:47
【问题描述】:
我是 Spark、Scala 和 Cassandra 的新手。 使用 Spark,我正在尝试从 MySQL 获取一些 id。
import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
Class.forName("com.mysql.jdbc.Driver").newInstance
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
val myRDD = new JdbcRDD( sc, () => DriverManager.getConnection(url,username,password) ,"select id from user limit ?, ?",1, 20, 10, r => r.getString("id")) ;
myRDD.foreach(println)
我可以看到打印在控制台上的 ID。
现在,对于每个获取的 id,我需要对 Cassandra 中的表进行 Sum 操作。
我创建了一个函数,我可以通过传递个人 id 来调用它
object HelloWorld {
def sum(id : String): Unit = {
val each_spark_rdd = uplink_rdd.select("number").where("id=?",Id).as((c: Int) => c).sum
println(each_spark_rdd)
}
}
并将 uplink_rdd 声明为
val uplink_rdd = sc.cassandraTable("keyspace", "table")
我可以通过传递个人 id 来调用函数,并且可以看到总和
scala> HelloWorld.sum("5")
50
当我尝试在每个 fetch id 上运行相同的函数时
myRDD.map(HelloWorld.sum)
or
myRDD.foreach(HelloWorld.sum)
or
for (id <- myRDD) HelloWorld.sum(id)
它给出与异常相同的异常
org.apache.spark.SparkException:任务不可序列化 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:59) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) 在$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:63) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:65) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:67) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) 在 $iwC$$iwC$$iwC$$iwC$$iwC.(:71) 在 $iwC$$iwC$$iwC$$iwC.(:73) 在 $iwC$$iwC$$iwC.(:75) 在 $iwC$$iwC.(:77) 在 $iwC.(:79) 在 (:81) 在 .(:85) 在 .() 在 .(:7) at .() at $print() 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 在 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) 在 org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 在 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 在 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 在 org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 在 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 在 org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 在 org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 在 org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 在 org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 在 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 在 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 在 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 在 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 在 org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 在 org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 在 org.apache.spark.repl.Main$.main(Main.scala:31) 在 org.apache.spark.repl.Main.main(Main.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 引起 作者:java.io.NotSerializableException:org.apache.spark.SparkConf
在阅读Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manually as 后,我尝试将@transient 添加到RDD
@transient val myRDD = new JdbcRDD ...
@transient val uplink_rdd = sc.cassandra....
但仍然出现同样的错误。
请告诉我如何从 Cassandara 表中找到从 Mysql 获取的每个 id 的总和。
【问题讨论】:
-
问题在于,您实际上是在尝试在转换中执行操作——Spark 中的转换和操作不能嵌套。当您调用
foreach时,Spark 会尝试序列化HelloWorld.sum以将其传递给每个执行程序 - 但这样做它也必须序列化函数的闭包,其中包括uplink_rdd(并且不可序列化)。但是,当您发现自己尝试做这种事情时,通常只是表明您想使用join或类似的东西。 -
你检查了吗? link
-
对象 HelloWorld 扩展可序列化
标签: scala serialization apache-spark cassandra rdd