【问题标题】:Error When Running local Spark with jdbcRDD使用 jdbcRDD 运行本地 Spark 时出错
【发布时间】:2015-10-23 19:42:57
【问题描述】:

我正在尝试在本地运行 spark 作业以将 mysql 表内容(在本地机器中)读取到 jdbcRDD。从网上我收集了下面的源代码并定制为读取元素表并加载所有列。

private static final JavaSparkContext sc = new JavaSparkContext(
        new SparkConf().setAppName("SparkJdbc").setMaster("local[*]"));

private static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://localhost:3306/ddm";
private static final String MYSQL_USERNAME = "root";
private static final String MYSQL_PWD = "root";

public static void main(String[] args) {
    DbConnection dbConnection = new DbConnection(MYSQL_DRIVER,
            MYSQL_CONNECTION_URL, MYSQL_USERNAME, MYSQL_PWD);

    // Load data from MySQL
    JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<>(sc.sc(), dbConnection,
            "select * from element where elementid >= ? and elementid <= ?",
            1000, 1100, 10, new MapResult(),
            ClassManifestFactory$.MODULE$.fromClass(Object[].class));

    // Convert to JavaRDD
    JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD,
            ClassManifestFactory$.MODULE$.fromClass(Object[].class));

    // Join first name and last name
    List<String> employeeFullNameList = javaRDD.map(
            new Function<Object[], String>() {

                private static final long serialVersionUID = 1L;

                @Override
                public String call(final Object[] record) throws Exception {
                    return record[2] + " " + record[3];
                }
            }).collect();

    for (String fullName : employeeFullNameList) {
        System.out.println(fullName);
    }
}

static class DbConnection extends AbstractFunction0<Connection> implements
        Serializable {

    private static final long serialVersionUID = 1L;
    private String driverClassName;
    private String connectionUrl;
    private String userName;
    private String password;

    public DbConnection(String driverClassName, String connectionUrl,
            String userName, String password) {
        this.driverClassName = driverClassName;
        this.connectionUrl = connectionUrl;
        this.userName = userName;
        this.password = password;
    }

    @Override
    public Connection apply() {
        try {
            Class.forName(driverClassName);
        } catch (ClassNotFoundException e) {
        }

        Properties properties = new Properties();
        properties.setProperty("user", userName);
        properties.setProperty("password", password);

        Connection connection = null;
        try {
            connection = DriverManager.getConnection(connectionUrl,
                    properties);
        } catch (SQLException e) {
        }

        return connection;
    }
}

static class MapResult extends AbstractFunction1<ResultSet, Object[]>
        implements Serializable {
    private static final long serialVersionUID = 1L;

    public Object[] apply(ResultSet row) {
        return JdbcRDD.resultSetToObjectArray(row);
    }
} 

但是,当我执行代码时,我会遇到 NullPointerException。

15/08/01 08:27:23 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)java.lang.NullPointerException
at org.apache.spark.rdd.JdbcRDD$$anon$1.<init>(JdbcRDD.scala:79)
at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:74)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

我在 github 中查找了 jdbcrdd.scala 代码,在第 79 行它指向 SQL stmt。

 val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

所以,上面的语句是失败的。我已经给出了所需的详细信息,但它抛出了空异常。谁能帮我在哪里做错了?

【问题讨论】:

    标签: mysql apache-spark local


    【解决方案1】:

    我忽略了我的导入语句。添加以下代码后,我可以在本地执行 spark 程序。

    import java.sql.{PreparedStatement, Connection, ResultSet}
    

    【讨论】:

      猜你喜欢
      • 2017-11-19
      • 1970-01-01
      • 2017-01-11
      • 2019-07-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-11-24
      • 2015-10-05
      相关资源
      最近更新 更多