【问题标题】:Spark Structured Streaming output not displaying in inteliJ consoleSpark Structured Streaming 输出未显示在 IntelliJ 控制台中
【发布时间】:2018-12-01 18:09:47
【问题描述】:

我正在尝试从读取 CSV 文件的 Jacek Laskowski 书中模拟 this example 并在控制台中聚合数据,但由于某种原因,输出未显示在 InteliJ 控制台中。

scala> spark.version
res4: String = 2.2.0

我在 SO 的某些地方(12345)找到了一些参考资料,我尝试了所有方法,但没有解决问题。

这是代码:

package org.sample

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

object App {
  def main(args : Array[String]): Unit = {

    val DIR = new java.io.File(".").getCanonicalPath + "dataset/stream_in"

    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Spark Structured Streaming Job")

    val spark = SparkSession.builder()
      .appName("Spark Structured Streaming Job")
      .master("local[*]")
      .getOrCreate()

    val reader = spark.readStream
      .format("csv")
      .option("header", true)
      .option("delimiter", ";")
      .option("latestFirst", "true")
      .schema(SchemaDefinition.csvSchema)
      .load(DIR + "/*")

    reader.createOrReplaceTempView("user_records")

    val tranformation = spark.sql(
      """
        SELECT carrier, marital_status, COUNT(1) as num_users
        FROM user_records
        GROUP BY carrier, marital_status
      """
    )

    val consoleStream = tranformation
      .writeStream
      .format("console")
      .option("truncate", false)
      .outputMode("complete")
      .start()

    consoleStream.awaitTermination()
  }
}

我的输出只是:

18/11/30 15:40:31 INFO StreamExecution: Streaming query made progress: {
  "id" : "9420f826-0daf-40c9-a427-e89ed42ee738",
  "runId" : "991c9085-3425-4ea6-82af-4cef20007a66",
  "name" : null,
  "timestamp" : "2018-11-30T14:40:31.117Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 2,
    "triggerExecution" : 2
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:00:00.000Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/structured-streamming-taskdataset/stream_in/*]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6a62e7ef"
  }
}

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    我重新定义了文件,现在为我工作:

    区别:

    1. 删除不必要的conf。使用SparkSession,我们不需要 拨打conf
    2. .load(/*) 不起作用。有效的是只保留路径 dataset/stream_in;
    3. tranformation 的数据错误(字段与 文件)

    最终代码:

    package org.sample
    
    import org.apache.spark.sql.SparkSession
    import org.apache.log4j.{Level, Logger}
    
    object StreamCities {
    
      def main(args : Array[String]): Unit = {
    
        // Turn off logs in console
        Logger.getLogger("org").setLevel(Level.OFF)
        Logger.getLogger("akka").setLevel(Level.OFF)
    
        val spark = SparkSession.builder()
          .appName("Spark Structured Streaming get CSV and agregate")
          .master("local[*]")
          .getOrCreate()
    
        // 01. Schema Definition: We'll put the structure of our
        // CSV file. Can be done using a class, but for simplicity
        // I'll keep it here
        import org.apache.spark.sql.types._
        def csvSchema = StructType {
          StructType(Array(
            StructField("id", StringType, true),
            StructField("name", StringType, true),
            StructField("city", StringType, true)
          ))
        }
    
        // 02. Read the Stream: Create DataFrame representing the
        // stream of the CSV according our Schema. The source it is
        // the folder in the .load() option
        val users = spark.readStream
          .format("csv")
          .option("sep", ",")
          .option("header", true)
          .schema(csvSchema)
          .load("dataset/stream_in")
    
        // 03. Aggregation of the Stream: To use the .writeStream()
        // we must pass a DF aggregated. We can do this using the
        // Untyped API or SparkSQL
    
        // 03.1: Aggregation using untyped API
        //val aggUsers = users.groupBy("city").count()
    
        // 03.2: Aggregation using Spark SQL
        users.createOrReplaceTempView("user_records")
    
        val aggUsers = spark.sql(
          """
            SELECT city, COUNT(1) as num_users
            FROM user_records
            GROUP BY city"""
        )
    
        // Print the schema of our aggregation
        println(aggUsers.printSchema())
    
        // 04. Output the stream: Now we'll write our stream in
        // console and as new files will be included in the folder
        // that Spark it's listening the results will be updated
        val consoleStream = aggUsers.writeStream
          .outputMode("complete")
          .format("console")
          .start()
          .awaitTermination()
      }
    }
    

    【讨论】:

      猜你喜欢
      • 2020-10-11
      • 1970-01-01
      • 1970-01-01
      • 2022-01-05
      • 1970-01-01
      • 2016-08-26
      • 2020-03-19
      • 2020-09-12
      • 2021-12-07
      相关资源
      最近更新 更多