【问题标题】:Spark - serialization problem with parsing files using OpenCSVSpark - 使用 OpenCSV 解析文件的序列化问题
【发布时间】:2019-05-17 23:06:27
【问题描述】:

我正在使用 Spark 处理 csv 文件。最近我用 opencsv 替换了手动 CSV 行解析。这是简化的代码

public class Main {

    public static void main(String[] args) {

        CSVParser parser = new CSVParserBuilder()
                .withSeparator(';')
                .build();

        SparkConf cfg = new SparkConf()
                .setMaster("local[4]")
                .setAppName("Testapp");
        JavaSparkContext sc = new JavaSparkContext(cfg);

        JavaRDD<String> textFile = sc.textFile("testdata.csv", 1);

        List<String> categories = textFile
                .map(line -> parser.parseLine(line)[10])
                .collect();
        System.out.println(categories);
    }
}

很遗憾,该代码不起作用。它产生一个异常

Caused by: java.io.NotSerializableException: com.opencsv.CSVParser
Serialization stack:
    - object not serializable (class: com.opencsv.CSVParser, value: com.opencsv.CSVParser@1290c49)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class test.Main, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic test/Main.lambda$main$49bd2722$1:(Lcom/opencsv/CSVParser;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class test.Main$$Lambda$19/429639728, test.Main$$Lambda$19/429639728@72456279)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 12 more

Spark 似乎试图序列化 lambda 表达式,但不知何故,lamba 表达式一直引用parser,这会导致上述错误。

问题是:有什么方法可以避免该异常并在传递给 Spark 的 lambda 表达式中使用不可序列化库?我真的不想实现自己的 csv 解析器。

【问题讨论】:

    标签: java csv apache-spark rdd opencsv


    【解决方案1】:

    我意识到我的问题有一个非常简单的解决方案。任何导致序列化问题的外部库使用都可能包含在静态方法中。对parser 的引用被方法parse 隐藏。这种方法显然不是一个完美的解决方案,但很有效。

    public class Main {
    
        private static CSVParser parser = new CSVParserBuilder()
                .withSeparator(';')
                .build();
    
        public static void main(String[] args) {
            SparkConf cfg = new SparkConf()
                    .setMaster("local[4]")
                    .setAppName("Testapp");
            JavaSparkContext sc = new JavaSparkContext(cfg);
    
            JavaRDD<String> textFile = sc.textFile("testdata.csv", 1);
    
            List<String> categories = textFile
                    .map(line -> parse(line)[0])
                    .collect();
            System.out.println(categories);
        }
    
        static String[] parse(String line) throws IOException {
            return parser.parseLine(line);
        }
    }
    

    【讨论】:

      【解决方案2】:

      Spark 支持开箱即用的 CSV 文件

      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.Dataset;
      
      Dataset<Row> df = spark.read().format("csv")
                            .option("sep", ";")
                            .option("header", "true") //or "false" if no headers
                            .load("filename.csv");
      

      编辑(提升对主要答案的评论)

      如果你真的需要它,你可以使用 df.javaRDD() 从 DataFrame 中获取 RDD 尽管最好使用 DataSet/DataFrame API(例如,参见 here

      【讨论】:

      • RDD api中是否有类似的解决方案?
      • 否,但您可以使用 df.javaRDD() 从数据集中获取 RDD(尽管建议使用数据集/数据帧 API)
      • 我改变了使用内部解析的方法。顺便说一句,我找到了不同的解决方案。看@我的回答。
      猜你喜欢
      • 2018-06-04
      • 1970-01-01
      • 2018-07-05
      • 2017-10-01
      • 2010-12-14
      • 2015-05-26
      • 1970-01-01
      • 2017-02-04
      • 2016-10-09
      相关资源
      最近更新 更多