【问题标题】:Replace new line (\n) character in csv file - spark scala替换csv文件中的新行(\ n)字符 - spark scala
【发布时间】:2016-08-27 16:11:51
【问题描述】:

只是为了说明问题,我采用了一个测试集 csv 文件。但在实际情况下,问题必须处理的不仅仅是 TeraByte 数据。

我有一个 CSV 文件,其中的列用引号 ("col1") 括起来。但是当数据导入完成时。一列包含换行符(\n)。当我想将它们保存为 Hive 表时,这会导致我遇到很多问题。

我的想法是用“|”替换 \n 字符火花中的管道。

到目前为止我达到了:

1. val test = sqlContext.load(
        "com.databricks.spark.csv",
        Map("path" -> "test_set.csv", "header" -> "true", "inferSchema" -> "true", "delimiter" -> "," , "quote" -> "\"", "escape" -> "\\" ,"parserLib" -> "univocity" ))#read a csv file

 2.   val dataframe = test.toDF() #convert to dataframe

  3.    dataframe.foreach(println) #print

    4. dataframe.map(row => {
        val row4 = row.getAs[String](4)
        val make = row4.replaceAll("[\r\n]", "|") 
        (make)
      }).collect().foreach(println) #replace not working for me

样本集:

(17 , D73 ,525, 1  ,testing\n    ,  90 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,526, 1  ,null         ,  89 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,529, 1  ,once \n again,  10 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,531, 1  ,test3\n      ,  10 ,20.07.2011 ,null ,F10 , R)

预期结果集:

(17 , D73 ,525, 1  ,testing|    ,  90 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,526, 1  ,null         ,  89 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,529, 1  ,once | again,  10 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,531, 1  ,test3|      ,  10 ,20.07.2011 ,null ,F10 , R)

什么对我有用:

val rep = "\n123\n Main Street\n".replaceAll("[\\r\\n]", "|") rep: String = |123| Main Street|

但是为什么我不能在元组的基础上做呢?

 val dataRDD = lines_wo_header.map(line => line.split(";")).map(row => (row(0).toLong, row(1).toString, 
                                               row(2).toLong, row(3).toLong, 
                                               row(4).toString, row(5).toLong,
                                               row(6).toString, row(7).toString, row(8).toString,row(9).toString)) 

dataRDD.map(row => {
                val wert = row._5.replaceAll("[\\r\\n]", "|") 
                (row._1,row._2,row._3,row._4,wert,row._6, row._7,row._8,row._9,row._10)
                }).collect().foreach(println)

Spark --version 1.3.1

【问题讨论】:

  • replaceAll("[\\r\\n]", "|") 将所有 'r' 和 'n' 字符替换为 '|'而我们只想替换换行符(\n)。

标签: scala replace apache-spark character newline


【解决方案1】:

spark 2.2 版JIRA 增加了对 CSV 的多行支持,spark 2.2 尚未发布。

我遇到了同样的问题,并在我们 hadoop 输入格式和阅读器的帮助下解决了它。

git 复制 InputFormat 和 reader 类并像这样实现:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

//implementation

 JavaPairRDD<LongWritable, Text> rdd =
                context.
                        newAPIHadoopFile(path, FileCleaningInputFormat.class, null, null, new Configuration());
JavaRDD<String> inputWithMultiline= rdd.map(s -> s._2().toString())

另一种解决方案- 使用 Apache crunch 中的 CSVInputFormat 读取 CSV 文件,然后使用 opencsv 解析每个 CSV 行:

sparkContext.newAPIHadoopFile(path, CSVInputFormat.class, null, null, new Configuration()).map(s -> s._2().toString());

Apache crunch maven 依赖:

 <dependency>
      <groupId>org.apache.crunch</groupId>
      <artifactId>crunch-core</artifactId>
      <version>0.15.0</version>
  </dependency>

【讨论】:

    【解决方案2】:

    我的想法是用“|”替换 \n 字符火花中的管道。

    我尝试了 replaceAll 方法,但它不起作用。这是实现相同目的的替代方法:

    val test = sq.load(
            "com.databricks.spark.csv",
            Map("path" -> "file:///home/veda/sample.csv", "header" -> "false", "inferSchema" -> "true", "delimiter" -> "," , "quote" -> "\"", "escape" -> "\\" ,"parserLib" -> "univocity" ))
    
    val dataframe = test.toDF()
    
    val mapped = dataframe.map({
        row => {
        val str = row.get(0).toString()
        var fnal=new StringBuilder(str)
        //replace newLine 
        var newLineIndex=fnal.indexOf("\\n")
        while(newLineIndex != -1){
            fnal.replace(newLineIndex,newLineIndex+2,"|")
            newLineIndex = fnal.indexOf("\\n")                  
        }
    
        //replace carriage returns
        var cgIndex=fnal.indexOf("\\r")
        while(cgIndex != -1){
            fnal.replace(cgIndex,cgIndex+2,"|")
            cgIndex = fnal.indexOf("\\r")                   
        }
    
        (fnal.toString()) //tuple modified
    
        }
    })
    
    mapped.collect().foreach(println)
    

    注意:您可能希望将重复的代码移动到单独的函数中。

    【讨论】:

      【解决方案3】:

      如果您可以使用 Spark SQL 1.5 或更高版本,您可以考虑使用可用于列的functions。假设您不知道(或没有)列的名称,您可以按照以下 sn-p 进行操作:

      val df = test.toDF()
      
      import org.apache.spark.sql.functions._
      val newDF = df.withColumn(df.columns(4), regexp_replace(col(df.columns(4)), "[\\r\\n]", "|"))
      

      如果您知道列的名称,则可以在两次出现时将 df.columns(4) 替换为其名称。

      我希望这会有所帮助。 干杯。

      【讨论】:

      • 谢谢,不幸的是我没有 DataFrame API。需要使用 RDD。
      • @user3560220 您可以使用数据帧并稍后通过调用newDF.rddnewDF.map 将其转换回RDD
      • 我尝试了您的解决方案,但“\n”字符没有被替换。你怎么能解决它。 ?有什么我想念的吗?
      • 实际上,在我的测试中,即使您的解决方案也有效。也许它在您的数据中?是引发错误还是无法替换?
      • @DanieldePaula:我按照您的建议尝试了解决方案,并尝试使用 Windows 中的独立 spark 在本地输出我的文件。当我在 Notepad++ 中打开时,它总是在每行的末尾显示 CRLF,并启用了设置 --> 查看符号。我实际上期待 CRLF 使用替换功能消失。我在这里错过了什么吗? regexp_replace(ColName, "[\\r\\n]", "") as trimmed_column
      猜你喜欢
      • 2015-08-18
      • 2015-11-26
      • 2015-07-07
      • 1970-01-01
      • 2022-12-22
      • 2017-09-03
      • 2023-03-26
      • 2021-09-26
      • 2016-12-17
      相关资源
      最近更新 更多