【问题标题】:Spark streaming nested execution serialization issuesSpark 流式嵌套执行序列化问题
【发布时间】:2018-09-07 20:33:45
【问题描述】:

我正在尝试连接 Spark 流应用程序中的 DB2 数据库和导致“org.apache.spark.SparkException: Task not serializable”问题的数据库查询执行语句。请指教。以下是我供参考的示例代码。

        dataLines.foreachRDD{rdd=>
          val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)

          val dataRows=rdd.map(rs => rs.value).map(row =>
            row.split(",")(1)-> (row.split(",")(0), row.split(",")(1), row.split(",")(2)
              , "cvflds_"+row.split(",")(3).toLowerCase, row.split(",")(4), row.split(",")(5), row.split(",")(6))
          )

          val db2Conn = getDB2Connection(spark,db2ConParams)

          dataRows.foreach{ case (k,v) =>
              val table = v._4
              val dbQuery = s"(SELECT * FROM $table ) tblResult"
              val df=getTableData(db2Conn,dbQuery)
              df.show(2)
          }
        }


Below is other function code:

  private def getDB2Connection(spark: SparkSession, db2ConParams:scala.collection.immutable.Map[String,String]): DataFrameReader = {
      spark.read.format("jdbc").options(db2ConParams)
  }

  private def getTableData(db2Con: DataFrameReader,tableName: String):DataFrame ={
      db2Con.option("dbtable",tableName).load()
  }



object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

以下是错误日志:

2018-03-28 22:12:21,487 [JobScheduler] 错误 org.apache.spark.streaming.scheduler.JobScheduler - 运行作业流作业时出错 1522289540000 ms.0 org.apache.spark.SparkException:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 在 org.apache.spark.rdd.RDD.foreach(RDD.scala:915) 在 ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1.apply(DB2DataLoadToKudu.scala:139) 在 ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1.apply(DB2DataLoadToKudu.scala:128) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 在 org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 在 scala.util.Try$.apply(Try.scala:192) 在 org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) 在 scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 在 java.lang.Thread.run(Thread.java:745) 引起:java.io.NotSerializableException:org.apache.spark.sql.DataFrameReader 序列化栈: - 对象不可序列化(类:org.apache.spark.sql.DataFrameReader,值:org.apache.spark.sql.DataFrameReader@15fdb01) - 字段(类:ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1$$anonfun$apply$2,名称:db2Conn$1,类型:类 org.apache.spark.sql.DataFrameReader) - 对象(类 ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1$$anonfun$apply$2, ) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 30 更多

【问题讨论】:

  • 请发布错误消息的全文,包括堆栈跟踪。
  • 嗨,Ken,添加了错误日志。请检查。
  • 您可以尝试将 spark 会话创建行移出 dataLines.foreachRDD 吗?
  • 嘿 Vinod,将 spark 会话移出 foreachRDD 并不能解决我的问题。我必须为我在 foreach 循环中收到的每条消息查询数据库。
  • 另一个猜测是将方法作为函数。你能检查stackoverflow.com/questions/22592811/…

标签: database apache-spark serialization streaming


【解决方案1】:

理想情况下,您应该保持dataRows.foreach 中的闭包没有任何连接对象,因为闭包是为了序列化到执行程序并在那里运行。这个概念深入探讨@本官link

在您的情况下,以下行是导致问题的关闭:

val df=getTableData(db2Conn,dbQuery)

所以,而不是使用 spark 来加载 DB2 表,在您的情况下变为(在组合方法之后):

spark.read.format("jdbc").options(db2ConParams).option("dbtable",tableName).load()

在闭包中使用普通的 JDBC 来实现这一点。您可以在 jdbc 代码中使用db2ConParams。 (我认为它足够简单,可以序列化)。该链接还建议使用rdd.foreachPartitionConnectionPool 进一步优化。

除了df.show(2),您还没有提到您对表格数据的处理。如果行很大,那么您可以讨论更多有关您的用例的信息。也许,你需要考虑一个不同的设计。

【讨论】:

  • Sujit,感谢您对使用传统方式获取数据的建议。我试过这个它看起来工作。感谢您的帮助。
  • @MarutiK,很高兴为您提供帮助。考虑投票/标记为答案。
猜你喜欢
  • 2017-03-10
  • 2020-12-06
  • 2018-07-05
  • 2015-10-20
  • 2015-06-18
  • 2022-10-16
  • 2016-10-09
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多