【问题标题】:How to read .csv files using spark streaming and write to parquet file using Scala?如何使用火花流读取 .csv 文件并使用 Scala 写入镶木地板文件?
【发布时间】:2017-07-26 02:20:03
【问题描述】:

我正在尝试使用 spark 2.1.0 SparkStreaming 程序读取文件。 csv 文件存储在我本地机器上的一个目录中,并尝试使用 writestream parquet 和我本地机器上的一个新文件。但是每当我尝试在 .parquet 中总是出错或获取空白文件夹时。

这是我的代码:

case class TDCS_M05A(TimeInterval:String ,GantryFrom:String ,GantryTo:String ,VehicleType:Integer ,SpaceMeanSpeed:Integer ,CarTimes:Integer)

object Streamingcsv {
  def main(args: Array[String]) {

    val spark = SparkSession
      .builder
      .appName("Streamingcsv")
      .config("spark.master", "local")
      .getOrCreate()

    import spark.implicits._

    import org.apache.spark.sql.types._


    val schema = StructType(
        StructField("TimeInterval",DateType, false) ::
        StructField("GantryFrom", StringType, false) ::
        StructField("GantryTo", StringType, false) ::
        StructField("VehicleType", IntegerType, false) ::
        StructField("SpaceMeanSpeed", IntegerType, false) ::
        StructField("CarTimes", IntegerType, false) ::  Nil)


    import org.apache.spark.sql.Encoders

    val usrschema = Encoders.product[TDCS_M05A].schema

    val csvDF = spark.readStream
      .schema(usrschema) // Specify schema of the csv files
      .csv("/home/hduser/IdeaProjects/spark2.1/data/*.csv")


    val query = csvDF.select("GantryFrom").where("CarTimes > 0")

    query
      .writeStream
      .outputMode("append")
      .format("parquet")
      .option("checkpointLocation", "checkpoint")
      .start("/home/hduser/IdeaProjects/spark2.1/output/")
      //.parquet("/home/hduser/IdeaProjects/spark2.1/output/")
      //.start()

    query.awaitTermination()
  }

我参考页面How to read a file using sparkstreaming and write to a simple file using Scala? 还是不行,请帮帮我,谢谢。

【问题讨论】:

    标签: scala csv apache-spark spark-streaming parquet


    【解决方案1】:

    在开始之前,您应该确保检查点目录存在(或创建它)。您的查询实现还应包括一个与您的 DF 分开的查询的 val。

    如果你不需要,不要放在一个包含空参数的对象中,这样你就可以直接访问其他方法,比如

     query.lastProgress
     query.stop
    

    (仅举几例)

    将代码的后半部分更改为如下所示。

    import org.apache.spark.sql.streaming.OutputMode
    
    val csvQueriedDF = csvDF.select("GantryFrom").where("CarTimes > 0")
    
    val query = csvQueriedDF
      .writeStream
      .outputMode(OutputMode.Append())
      .format("parquet")
      .option("checkpointLocation", "/home/hduser/IdeaProjects/spark2.1/partialTempOutput")
      .option("path", "/home/hduser/IdeaProjects/spark2.1/output/")
      .start()
    

    祝你好运!

    【讨论】:

      猜你喜欢
      • 2018-05-19
      • 2018-06-02
      • 2016-08-31
      • 2017-04-28
      • 1970-01-01
      • 2016-01-18
      • 2019-02-11
      • 2018-04-05
      • 2019-01-12
      相关资源
      最近更新 更多