【问题标题】:SPARK: parse each RDD element with a function of a "not serializable"objectSPARK:使用“不可序列化”对象的函数解析每个 RDD 元素
【发布时间】:2015-10-21 16:36:04
【问题描述】:

我正在 Spark 中读取一个大型 CSV 文件,我想使用 CSVParser 库 (au.com.bytecode.opencsv.CSVParser) 解析每一行

这是我的代码:

val parsedLines = sc.textFile("path/to/a/csv/file.csv").map(line => {
     val parser = new CSVParser(',')
     try{
       parser.parseLine(line)
     }catch{
       case e: Exception => "Error"
     }
})

调用parser.parseLine(line) 的结果是Array[String]

我无法在地图之外创建解析器,因为 CSVParser 类不可序列化。

可以像我在上面的代码中那样在 map 函数中调用 new CSVParser(',') 吗?...如果是,为什么?...如果不是,为什么?

是否为 RDD 的每个元素调用 new CSVParser(',')

有没有更有效的方法来以不同的方式进行相同的处理?

【问题讨论】:

    标签: scala serialization apache-spark


    【解决方案1】:

    就我个人而言,我会简单地使用来自 Databricks 的 spark-csv 包来解析 CSV 文件,但如果您坚持使用 CSVParser,则应该使用 mapPartitions 而不是 map。当您使用map 时,确实会为RDD 中的每个元素调用new CSVParser(),而如果您使用mapPartitions,则每个分区仅调用一次。

    【讨论】:

    【解决方案2】:

    var parser = new CSVParser(',');

    你可以定义如下函数并调用..

     private def getTokens(value: String): Array[String] = {
        if (!"".equals(value)) {
          var tokens: Array[String] = parser.parseLine(value);
            return tokens;
        }
        return null;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-08-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-28
      • 2016-12-16
      • 1970-01-01
      相关资源
      最近更新 更多