【问题标题】:Spark Shell : Task not SerializableSpark Shell:任务不可序列化
【发布时间】:2016-12-30 20:02:47
【问题描述】:

我是 Spark、Scala 和 Cassandra 的新手。 使用 Spark,我正在尝试从 MySQL 获取一些 id。

import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
Class.forName("com.mysql.jdbc.Driver").newInstance

import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf

val myRDD = new JdbcRDD( sc, () => DriverManager.getConnection(url,username,password) ,"select id from user limit ?, ?",1, 20, 10, r => r.getString("id")) ;
myRDD.foreach(println) 

我可以看到打印在控制台上的 ID。

现在,对于每个获取的 id,我需要对 Cassandra 中的表进行 Sum 操作。

我创建了一个函数,我可以通过传递个人 id 来调用它

object HelloWorld { 
       def sum(id : String): Unit = {
        val each_spark_rdd = uplink_rdd.select("number").where("id=?",Id).as((c: Int) => c).sum
        println(each_spark_rdd)
      }
  }

并将 uplink_rdd 声明为

 val uplink_rdd = sc.cassandraTable("keyspace", "table")

我可以通过传递个人 id 来调用函数,并且可以看到总和

scala> HelloWorld.sum("5") 
50

当我尝试在每个 fetch id 上运行相同的函数时

myRDD.map(HelloWorld.sum)
or
myRDD.foreach(HelloWorld.sum)
or 
for (id <- myRDD) HelloWorld.sum(id)

它给出与异常相同的异常

org.apache.spark.SparkException:任务不可序列化 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:59) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) 在$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:63) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:65) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:67) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) 在 $iwC$$iwC$$iwC$$iwC$$iwC.(:71) 在 $iwC$$iwC$$iwC$$iwC.(:73) 在 $iwC$$iwC$$iwC.(:75) 在 $iwC$$iwC.(:77) 在 $iwC.(:79) 在 (:81) 在 .(:85) 在 .() 在 .(:7) at .() at $print() 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 在 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) 在 org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 在 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 在 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 在 org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 在 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 在 org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 在 org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 在 org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 在 org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 在 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 在 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 在 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 在 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 在 org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 在 org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 在 org.apache.spark.repl.Main$.main(Main.scala:31) 在 org.apache.spark.repl.Main.main(Main.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 引起 作者:java.io.NotSerializableException:org.apache.spark.SparkConf

在阅读Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manually as 后,我尝试将@transient 添加到RDD

@transient val myRDD = new JdbcRDD ...
@transient val uplink_rdd = sc.cassandra....

但仍然出现同样的错误。

请告诉我如何从 Cassandara 表中找到从 Mysql 获取的每个 id 的总和。

【问题讨论】:

  • 问题在于,您实际上是在尝试在转换中执行操作——Spark 中的转换和操作不能嵌套。当您调用 foreach 时,Spark 会尝试序列化 HelloWorld.sum 以将其传递给每个执行程序 - 但这样做它也必须序列化函数的闭包,其中包括 uplink_rdd(并且不可序列化)。但是,当您发现自己尝试做这种事情时,通常只是表明您想使用join 或类似的东西。
  • 你检查了吗? link
  • 对象 HelloWorld 扩展可序列化

标签: scala serialization apache-spark cassandra rdd


【解决方案1】:

您的代码试图在myRDD 的转换中使用uplink_rdd。应用于 RDD 的闭包不能包含另一个 RDD。

您应该按照joinWithCassandraTable 的方式做一些事情,这将并行和分布式(ly?)使用来自myRDD 的信息从Cassandra 中提取数据。如果您从 Cassandra 中提取单个分区键,则此方法有效

the Docs

另一种选择是使用连接器使用的池中的手动连接图。

val cc = CassandraConnector(sc.getConf)
myRDD.mapPartitions { it =>
  cc.withSessionDo { session =>
    session.execute("whatever query you want")
  }
}

如果您实际上是在 Cassandra 中对多个分区求和,则需要 为每个 ID 创建一个新的 rdd。

类似

myRDD.collect.foreach(HelloWorld.sum)

【讨论】:

    猜你喜欢
    • 2018-04-06
    • 2021-03-16
    • 1970-01-01
    • 2015-12-16
    • 2017-03-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多