【发布时间】:2017-07-26 02:20:03
【问题描述】:
我正在尝试使用 spark 2.1.0 SparkStreaming 程序读取文件。 csv 文件存储在我本地机器上的一个目录中,并尝试使用 writestream parquet 和我本地机器上的一个新文件。但是每当我尝试在 .parquet 中总是出错或获取空白文件夹时。
这是我的代码:
case class TDCS_M05A(TimeInterval:String ,GantryFrom:String ,GantryTo:String ,VehicleType:Integer ,SpaceMeanSpeed:Integer ,CarTimes:Integer)
object Streamingcsv {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("Streamingcsv")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.types._
val schema = StructType(
StructField("TimeInterval",DateType, false) ::
StructField("GantryFrom", StringType, false) ::
StructField("GantryTo", StringType, false) ::
StructField("VehicleType", IntegerType, false) ::
StructField("SpaceMeanSpeed", IntegerType, false) ::
StructField("CarTimes", IntegerType, false) :: Nil)
import org.apache.spark.sql.Encoders
val usrschema = Encoders.product[TDCS_M05A].schema
val csvDF = spark.readStream
.schema(usrschema) // Specify schema of the csv files
.csv("/home/hduser/IdeaProjects/spark2.1/data/*.csv")
val query = csvDF.select("GantryFrom").where("CarTimes > 0")
query
.writeStream
.outputMode("append")
.format("parquet")
.option("checkpointLocation", "checkpoint")
.start("/home/hduser/IdeaProjects/spark2.1/output/")
//.parquet("/home/hduser/IdeaProjects/spark2.1/output/")
//.start()
query.awaitTermination()
}
我参考页面How to read a file using sparkstreaming and write to a simple file using Scala? 还是不行,请帮帮我,谢谢。
【问题讨论】:
标签: scala csv apache-spark spark-streaming parquet