鉴于val spark=SparkSession.builder().getOrCreate() 我猜您使用的是 Spark 2.x。
首先,请注意 Spark 2.x 原生支持 CSV 格式,因此不需要通过长名称指定格式,即org.apache.spark.csv,而只需csv。
spark.read.format("csv")...
由于您使用csv 运算符,因此是隐含的 CSV 格式,因此您可以跳过/删除format("csv")。
// note that I removed format("csv")
spark.read.option("header", true).csv("/home/cloudera/Book1.csv")
您有很多选择,但我强烈建议您使用案例类......只是模式。如果您对如何在 Spark 2.0 中执行此操作感到好奇,请参阅最后一个解决方案。
强制转换运算符
您可以使用cast 运算符。
scala> Seq("1").toDF("str").withColumn("num", 'str cast "int").printSchema
root
|-- str: string (nullable = true)
|-- num: integer (nullable = true)
使用结构类型
您还可以将自己的手工架构与StructType 和StructField 一起使用,如下所示:
import org.apache.spark.sql.types._
val schema = StructType(
StructField("str", StringType, true) ::
StructField("num", IntegerType, true) :: Nil)
scala> schema.printTreeString
root
|-- str: string (nullable = true)
|-- num: integer (nullable = true)
val q = spark.
read.
option("header", true).
schema(schema).
csv("numbers.csv")
scala> q.printSchema
root
|-- str: string (nullable = true)
|-- num: integer (nullable = true)
架构 DSL
我最近发现很有趣的是所谓的Schema DSL。上面使用StructType 和StructField 构建的架构可以重写如下:
import org.apache.spark.sql.types._
val schema = StructType(
$"str".string ::
$"num".int :: Nil)
scala> schema.printTreeString
root
|-- str: string (nullable = true)
|-- num: integer (nullable = true)
// or even
val schema = new StructType().
add($"str".string).
add($"num".int)
scala> schema.printTreeString
root
|-- str: string (nullable = true)
|-- num: integer (nullable = true)
编码器
编码器非常易于使用,很难相信您不会想要它们,即使只是构建架构而不处理StructType、StructField 和DataType。
// Define a business object that describes your dataset
case class MyRecord(str: String, num: Int)
// Use Encoders object to create a schema off the business object
import org.apache.spark.sql.Encoders
val schema = Encoders.product[MyRecord].schema
scala> schema.printTreeString
root
|-- str: string (nullable = true)
|-- num: integer (nullable = false)