【问题标题】:How do I specify a schema when loading a csv from S3 in Spark with Scala?在使用 Scala 的 Spark 中从 S3 加载 csv 时,如何指定模式?
【发布时间】:2020-05-17 10:29:03
【问题描述】:

我在堆栈上搜索了多个语法迭代,但没有一个对我有用。我的代码如下:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType};

val schema1 = (new StructType)
    .add("PASSENGERID", IntegerType, true)
    .add("PCLASS", IntegerType, true)
    .add("NAME", IntegerType, true)
    .add("SEX", StringType, true)
    .add("AGE", DoubleType, true)
    .add("SIBSP", IntegerType, true)
    .add("PARCH", IntegerType, true)
    .add("TICKET", StringType, true)
    .add("FARE", DoubleType, true)
    .add("CABIN", StringType, true)
    .add("EMBARKED", StringType, true)

 val schema2 = StructType(
    StructField("PASSENGERID", IntegerType, true) ::
    StructField("PCLASS", IntegerType, true) ::
    StructField("NAME", IntegerType, true) ::
    StructField("SEX", StringType, true) ::
    StructField("AGE", DoubleType, true) ::
    StructField("SIBSP", IntegerType, true) ::
    StructField("PARCH", IntegerType, true) ::
    StructField("TICKET", StringType, true) ::
    StructField("FARE", DoubleType, true) ::
    StructField("CABIN", StringType, true) ::
    StructField("EMBARKED", StringType, true) :: Nil)

val schema3 = StructType(Array(
    StructField("PASSENGERID", IntegerType, true),
    StructField("PCLASS", IntegerType, true),
    StructField("NAME", IntegerType, true),
    StructField("SEX", StringType, true),
    StructField("AGE", DoubleType, true),
    StructField("SIBSP", IntegerType, true),
    StructField("PARCH", IntegerType, true),
    StructField("TICKET", StringType, true),
    StructField("FARE", DoubleType, true),
    StructField("CABIN", StringType, true),
    StructField("EMBARKED", StringType, true)))

val schema4 = StructType(Seq(
    StructField("PASSENGERID", IntegerType, true),
    StructField("PCLASS", IntegerType, true),
    StructField("NAME", IntegerType, true),
    StructField("SEX", StringType, true),
    StructField("AGE", DoubleType, true),
    StructField("SIBSP", IntegerType, true),
    StructField("PARCH", IntegerType, true),
    StructField("TICKET", StringType, true),
    StructField("FARE", DoubleType, true),
    StructField("CABIN", StringType, true),
    StructField("EMBARKED", StringType, true)
))

val schema5 = StructType(
  List(
    StructField("PASSENGERID", IntegerType, true),
    StructField("PCLASS", IntegerType, true),
    StructField("NAME", IntegerType, true),
    StructField("SEX", StringType, true),
    StructField("AGE", DoubleType, true),
    StructField("SIBSP", IntegerType, true),
    StructField("PARCH", IntegerType, true),
    StructField("TICKET", StringType, true),
    StructField("FARE", DoubleType, true),
    StructField("CABIN", StringType, true),
    StructField("EMBARKED", StringType, true)
  )
)

/*
val df = spark.read
    .option("header", true)
    .csv("s3a://mybucket/ybspark/input/PASSENGERS.csv")
    .schema(schema)
*/

//this works
val df = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv")

df.show(false)
df.printSchema()

//fun errors
val df1 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema1)
val df2 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema2)
val df3 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema3)
val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
val df5 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema5)

数据是 kaggle titanic 生存集,标题中的字段大写。我已经尝试将此作为脚本提交给 spark-shell 以及手动在 spark-shell 中运行命令。 spark-shell -i 在 dfX 读取时会吐出一些语法错误,如果我手动加载它们看起来不错的任何模式,并且读取都具有相同的错误。

scala> val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
<console>:26: error: overloaded method value apply with alternatives:
  (fieldIndex: Int)org.apache.spark.sql.types.StructField <and>
  (names: Set[String])org.apache.spark.sql.types.StructType <and>
  (name: String)org.apache.spark.sql.types.StructField
 cannot be applied to (org.apache.spark.sql.types.StructType)
       val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)

我不明白我做错了什么。我在 AWS EMR 上使用 Spark 版本 2.4.4。

【问题讨论】:

    标签: scala csv apache-spark


    【解决方案1】:

    设置 inferSchema 参数 false 以便 spark 在加载数据时不会推断架构。

    将您的.schema 移动到.csv 之前,因为DataFrame 对象将没有schema 功能。

    请检查以下代码。

    scala> val df1 = spark.read.option("header", true).option("inferSchema", false).schema(schema1).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
    df1: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]
    
    scala> val df2 = spark.read.option("header", true).option("inferSchema", false).schema(schema2).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
    df2: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]
    
    scala> val df3 = spark.read.option("header", true).option("inferSchema", false).schema(schema3).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
    df3: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]
    
    scala> val df4 = spark.read.option("header", true).option("inferSchema", false).schema(schema4).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
    df4: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]
    
    scala> val df5 = spark.read.option("header", true).option("inferSchema", false).schema(schema5).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
    df5: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]
    
    

    【讨论】:

    • inferSchema 默认不是 false 吗?我实际上只是抓住了.schema的顺序。然后有所有的空值,虽然它只是一个分类错误的字段,它似乎将它称为(名称),它使所有内容都为空......无论如何,现在已修复。谢谢!
    猜你喜欢
    • 2015-12-04
    • 2017-06-15
    • 2018-04-15
    • 2019-01-12
    • 1970-01-01
    • 1970-01-01
    • 2015-10-02
    • 2019-12-04
    • 1970-01-01
    相关资源
    最近更新 更多