【发布时间】:2018-09-07 20:33:45
【问题描述】:
我正在尝试连接 Spark 流应用程序中的 DB2 数据库和导致“org.apache.spark.SparkException: Task not serializable”问题的数据库查询执行语句。请指教。以下是我供参考的示例代码。
dataLines.foreachRDD{rdd=>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
val dataRows=rdd.map(rs => rs.value).map(row =>
row.split(",")(1)-> (row.split(",")(0), row.split(",")(1), row.split(",")(2)
, "cvflds_"+row.split(",")(3).toLowerCase, row.split(",")(4), row.split(",")(5), row.split(",")(6))
)
val db2Conn = getDB2Connection(spark,db2ConParams)
dataRows.foreach{ case (k,v) =>
val table = v._4
val dbQuery = s"(SELECT * FROM $table ) tblResult"
val df=getTableData(db2Conn,dbQuery)
df.show(2)
}
}
Below is other function code:
private def getDB2Connection(spark: SparkSession, db2ConParams:scala.collection.immutable.Map[String,String]): DataFrameReader = {
spark.read.format("jdbc").options(db2ConParams)
}
private def getTableData(db2Con: DataFrameReader,tableName: String):DataFrame ={
db2Con.option("dbtable",tableName).load()
}
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
以下是错误日志:
2018-03-28 22:12:21,487 [JobScheduler] 错误 org.apache.spark.streaming.scheduler.JobScheduler - 运行作业流作业时出错 1522289540000 ms.0 org.apache.spark.SparkException:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 在 org.apache.spark.rdd.RDD.foreach(RDD.scala:915) 在 ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1.apply(DB2DataLoadToKudu.scala:139) 在 ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1.apply(DB2DataLoadToKudu.scala:128) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 在 org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 在 scala.util.Try$.apply(Try.scala:192) 在 org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) 在 scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 在 java.lang.Thread.run(Thread.java:745) 引起:java.io.NotSerializableException:org.apache.spark.sql.DataFrameReader 序列化栈: - 对象不可序列化(类:org.apache.spark.sql.DataFrameReader,值:org.apache.spark.sql.DataFrameReader@15fdb01) - 字段(类:ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1$$anonfun$apply$2,名称:db2Conn$1,类型:类 org.apache.spark.sql.DataFrameReader) - 对象(类 ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1$$anonfun$apply$2, ) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 30 更多
【问题讨论】:
-
请发布错误消息的全文,包括堆栈跟踪。
-
嗨,Ken,添加了错误日志。请检查。
-
您可以尝试将 spark 会话创建行移出 dataLines.foreachRDD 吗?
-
嘿 Vinod,将 spark 会话移出 foreachRDD 并不能解决我的问题。我必须为我在 foreach 循环中收到的每条消息查询数据库。
-
另一个猜测是将方法作为函数。你能检查stackoverflow.com/questions/22592811/…
标签: database apache-spark serialization streaming