【问题标题】:Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext - when using JdbcRDD in Spark引起:java.io.NotSerializableException: org.apache.spark.SparkContext - 在 Spark 中使用 JdbcRDD 时
【发布时间】:2017-01-11 08:30:12
【问题描述】:

我正在尝试从 Mysql 数据库加载 RDD:

package ro.mfl.employees
import org.apache.spark.{SparkConf, SparkContext}
import java.sql.{Connection, DriverManager}

import org.apache.spark.rdd.JdbcRDD

class Loader(sc: SparkContext) {

  Class.forName("com.mysql.jdbc.Driver").newInstance()

  def connection(): Connection = {
    DriverManager.getConnection("jdbc:mysql://localhost/employees", "sakila", "sakila")
  }


  def load(): Unit = {
    val employeesRDD = new JdbcRDD(sc, connection, "select * from employees.employees", 0, 0, 1)
    println(employeesRDD.count())

  }

}

object Test extends App {
  val conf = new SparkConf().setAppName("test")
  val sc = new SparkContext(conf)
  val l = new Loader(sc)
  l.load()
}

当我执行这个时,我得到一个错误提示

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@323a9221)
    - field (class: ro.mfl.employees.Loader, name: sc, type: class org.apache.spark.SparkContext)
    - object (class ro.mfl.employees.Loader, ro.mfl.employees.Loader@607c6d60)
    - field (class: ro.mfl.employees.Loader$$anonfun$1, name: $outer, type: class ro.mfl.employees.Loader)
    - object (class ro.mfl.employees.Loader$$anonfun$1, <function0>)
    - field (class: org.apache.spark.rdd.JdbcRDD, name: org$apache$spark$rdd$JdbcRDD$$getConnection, type: interface scala.Function0)
    - object (class org.apache.spark.rdd.JdbcRDD, JdbcRDD[0] at JdbcRDD at Loader.scala:17)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (JdbcRDD[0] at JdbcRDD at Loader.scala:17,<function2>))

有人遇到过这个问题吗?

我试图让Loader 类扩展java.io.Serializable,但我得到了同样的错误,只是使用org.apache.spark.SparkContext 而不是Loader

【问题讨论】:

  • Class.forName("com.mysql.jdbc.Driver") 自 2007 年以来已过时,并且从未需要 .newInstance() 部分。这是 Scala 吗?
  • 是的,它是 scala。我删除了 newInstance,同样的错误。我不明白什么被弃用了。我没有在 javadoc 中看到“Class.forName”标记为已弃用。
  • 如果您对答案没问题,也请投票。谢谢

标签: mysql scala jdbc apache-spark


【解决方案1】:

问题:

您的问题是 Loaderclass 而不是 serializable

尝试将其更改为object。或者按照下面给出的例子。

object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@323a9221)

这是因为您的 Loader 是类,并且您通过创建新实例将 SparkContext 传递给 Loader 类..

按照这个例子(简单而优雅的方式),这应该工作:

import org.apache.spark._
import org.apache.spark.rdd.JdbcRDD
import java.sql.{DriverManager, ResultSet}
// not class enclosed in an object
object LoadSimpleJdbc {
  def main(args: Array[String]) {
    if (args.length < 1) {
      println("Usage: [sparkmaster]")
      exit(1)
    }
    val master = args(0)
    val sc = new SparkContext(master, "LoadSimpleJdbc", System.getenv("SPARK_HOME"))
    val data = new JdbcRDD(sc,
      createConnection, "SELECT * FROM panda WHERE ? <= id AND ID <= ?",
      lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
    println(data.collect().toList)
  }
/** createConnection - Get connection here **/
  def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver").newInstance();
    DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
  }
/** This returns tuple **/
  def extractValues(r: ResultSet) = {
    (r.getInt(1), r.getString(2))
  }
}

一般来说,尽量避免将SparkContext 存储在您的类中。

另外,看看Serialization Exception on spark

尝试将 SparkContext 声明为 @transient(一些用户在 SO 中使用这种方法)

【讨论】:

    猜你喜欢
    • 2014-06-29
    • 1970-01-01
    • 2015-10-23
    • 2017-11-19
    • 2014-02-05
    • 1970-01-01
    • 2015-06-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多