【问题标题】:Issues in loading multiple csv in spark with scala使用 scala 在 spark 中加载多个 csv 的问题
【发布时间】:2019-01-12 18:17:12
【问题描述】:

我正在将 Spark2.3Scala 一起使用并尝试从一个目录加载多个 csv 文件,我遇到了一个问题,它加载文件但错过了一些列他们

我有以下示例文件

test1.csv

Col1,Col2,Col3,Col4,Col5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5

test2.csv

Col1,Col2,Col3,Col4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4

test3.csv

Col1,Col2,Col3,Col4,Col6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6

test4.csv

Col1,Col2,Col5,Col4,Col3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3

我想要做的是将所有这些文件加载​​到一个数据框中,所有列都在 4 个文件中,但是当我尝试使用以下代码加载文件时

val dft = spark.read.format("csv").option("header", "true").load("path/to/directory/*.csv")

它会加载 csv,但会丢失 csv 中的一些列。

这是 dft.show()

的输出
+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col6|
+----+----+----+----+----+
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   3|   4|   5|
| aaa|   2|   3|   4|   5|
+----+----+----+----+----+

我希望它是这样的

+----+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col5|Col6|
+----+----+----+----+----+----+
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
+----+----+----+----+----+----+

请指导我的代码有什么问题? 或者还有其他有效的方法吗?

谢谢

【问题讨论】:

  • 与同一问题相关的一些事情stackoverflow.com/questions/48999381/…
  • Spark 的 CSV 阅读器不支持缺失列。你必须找到另一种方式。你能告诉我你有多少文件以及它们有多大吗?另外,当该列不存在时,您期望什么?

标签: scala csv apache-spark apache-spark-sql


【解决方案1】:

我找到了我试图解决的问题的解决方案,所以我想我应该将这个分享给任何试图实现相同输出的人。

我用 Parquet 解决了不同文件中的合并任务,有一些共同的列。

这里是代码

val conf = new SparkConf()
      .setAppName("Exercise")
      .setMaster("local")
val sc = new SparkContext(conf)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val spark = SparkSession
      .builder()
      .appName("Spark Sql Session")
      .config("spark.some.config.option", "test")
      .getOrCreate()

val filepath = sc.wholeTextFiles("path/to/MergeFiles/*.txt").keys
val list = filepath.collect().toList
var i = 1
list.foreach{ path  =>
val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("delimiter", ",")
    .option("header", "true")
    .load(path)
df.write.parquet("data/test_tbl/key="+ i)
    i +=1
}
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_tbl")

mergedDF.write.format("csv").save("target/directory/for/mergedFiles")

下面是mergedDF.show()的输出

+----+----+----+----+----+----+---+
|Col1|Col2|Col3|Col4|Col6|Col5|key|
+----+----+----+----+----+----+---+
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |3  |
|aaa |2   |3   |4   |null|5   |3  |
+----+----+----+----+----+----+---+

【讨论】:

    【解决方案2】:

    如果每个单独的文件不是太大,您可以使用wholeTextFile 并自己解析您的文件,如下所示:

    val columns = (1 to 6).map("Col"+_)
    val rdd = sc.wholeTextFiles("path_to_files/*")
        .map(_._2.split("\\n"))
        .flatMap(x=> { 
            // We consider the first line as the header
            val cols = x.head.split(",")
            // Then we flatten the remaining lines and shape each of them 
            // as a list of tuples (ColumnName, content).
            x.tail
                .map(_.split(","))
                .map(row => row.indices.map(i => cols(i) -> row(i))) 
        })
        .map(_.toMap)
        // Here we take the list of all the colmuns and map each of them to
        // its value if it exists, null otherwise.
        .map(map => columns.map(name => map.getOrElse(name, null) ))
        .map(Row.fromSeq _)
    

    此代码使用wholeTextFile 将每个文件放在一条记录中(这就是文件不能太大的原因),使用第一行来确定存在哪些列以及以何种顺序,创建一个将列名映射到的 Map值并在缺少值时将其转换为具有空值的行。然后,数据准备好进入数据框:

    val schema = StructType(
        columns.map(name => StructField(name, StringType, true))
    )
    spark.createDataFrame(rdd, schema).show()
    

    【讨论】:

    • 感谢您的回复,但第一部分在“map(map => columns.map(name => map.getOrElse(name, null)))”这一行中的“columns”上给出了错误,即“ cmd22.sc:1:未找到:值列”。你能告诉我它指的是哪些列吗?
    • 对,我应该早点定义columns ;-) 我编辑了答案。
    • 感谢编辑它解决了第一部分的问题,现在它在 StructField "not found: value StructField" 上给出错误,然后我为它添加了导入语句并得到了这个错误...... ..“方法应用的参数不足:(名称:字符串,数据类型:org.apache.spark.sql.types.DataType,可为空:布尔,元数据:org.apache.spark.sql.types.Metadata)org.apache .spark.sql.types.StructField in object StructField.未指定值参数dataType.val schema = StructField("
    • 这是我添加到代码第二部分的导入语句.... "import org.apache.spark.sql.types.StructField"
    • 您甚至可以输入import org.apache.spark.sql.types._import org.apache.spark.sql.types.{ StructType, StructField, StringType},因为您需要这三个。我意识到我又犯了一个错字。我在模式声明中使用了 StructField 而不是 StructType。我更正了答案。
    猜你喜欢
    • 2015-10-22
    • 2020-02-26
    • 1970-01-01
    • 1970-01-01
    • 2018-08-13
    • 1970-01-01
    • 1970-01-01
    • 2018-06-26
    • 1970-01-01
    相关资源
    最近更新 更多