【问题标题】:Getting java.io.NotSerializableException while mapping a JavaRDD映射 JavaRDD 时获取 java.io.NotSerializableException
【发布时间】:2016-11-28 06:58:56
【问题描述】:

以下是当我尝试将作业分派给执行程序时导致 java.io.NotSerializableException 的代码。

    JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
    JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() {

        /**
         * Serial Version Id
         */
        private static final long serialVersionUID = 6766320395808127072L;

        @Override
        public String call(Row row) throws Exception {
            return row.mkString(dataFormat.getDelimiter());
        }
    });

但是,当我执行以下操作时,任务已成功序列化:

JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
List<String> dataList = rddToWrite.collect().stream().parallel()
                           .map(row -> row.mkString(dataFormat.getDelimiter()))
                           .collect(Collectors.<String>toList());
JavaSparkContext javaSparkContext = new JavaSparkContext(sessionContext.getSparkContext());
JavaRDD<String> stringRDD = javaSparkContext.parallelize(dataList);

谁能帮我指出我在这里做错了什么?

编辑: dataFormat 是编写包含此代码的函数的类中的私有成员字段。它是一个 DataFormat 类的对象,它定义了两个字段,即 spark 数据格式(例如“com.databricks.spark.csv”)和分隔符(例如“\t”)。

【问题讨论】:

  • dataFormat 是什么?
  • dataFormat是局部变量还是封闭类的字段?

标签: java serialization apache-spark


【解决方案1】:

new Function ...创建的匿名类需要引用封闭实例,序列化函数需要序列化封闭实例,包括dataFormat所有其他字段。如果该类未标记为Serializable,或具有任何不可序列化的非transient 字段,则它将不起作用。即使它确实如此,它的性能也会比必要的更差。

不幸的是,要完全解决这个问题,您需要创建一个命名的静态内部类(或只是一个单独的类),它甚至不能是本地的(因为匿名和 local classes in Java 都不能是静态的):

static class MyFunction extends Function<Row, String> {
    private String delimiter;
    private static final long serialVersionUID = 6766320395808127072L;

    MyFunction(String delimiter) {
        this.delimiter = delimiter;
    }

    @Override
    public String call(Row row) throws Exception {
        return row.mkString(delimiter);
    }
}

然后

JavaRDD<String> stringRdd = rddToWrite.map(new MyFunction(dataFormat.getDelimiter()));

【讨论】:

    【解决方案2】:

    当您访问dataFormat 时,表示this.dataFormat。 所以spark会尝试序列化this,遇到NotSerializableException

    尝试制作一个本地副本,例如:

    DataFormat dataformat = this.dataformat;
    JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
    JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() ...
    

    有关详细信息,请参阅 http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark

    【讨论】:

    • 至少在快速测试中,即使是不访问封闭实例的任何方法或字段的匿名类仍然具有对它的引用,因此会尝试对其进行序列化。不过,也许我做错了什么……
    • 阿列克谢是对的!这仍在序列化封闭的实例。导致同样的问题。
    • 是的,他是对的。我对java中的匿名函数有一些误解。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-08-17
    • 2023-04-01
    • 2018-08-23
    • 1970-01-01
    • 1970-01-01
    • 2020-11-17
    • 1970-01-01
    相关资源
    最近更新 更多