【发布时间】:2018-12-01 18:09:47
【问题描述】:
我正在尝试从读取 CSV 文件的 Jacek Laskowski 书中模拟 this example 并在控制台中聚合数据,但由于某种原因,输出未显示在 InteliJ 控制台中。
scala> spark.version
res4: String = 2.2.0
我在 SO 的某些地方(1、2、3、4、5)找到了一些参考资料,我尝试了所有方法,但没有解决问题。
这是代码:
package org.sample
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
object App {
def main(args : Array[String]): Unit = {
val DIR = new java.io.File(".").getCanonicalPath + "dataset/stream_in"
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Spark Structured Streaming Job")
val spark = SparkSession.builder()
.appName("Spark Structured Streaming Job")
.master("local[*]")
.getOrCreate()
val reader = spark.readStream
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("latestFirst", "true")
.schema(SchemaDefinition.csvSchema)
.load(DIR + "/*")
reader.createOrReplaceTempView("user_records")
val tranformation = spark.sql(
"""
SELECT carrier, marital_status, COUNT(1) as num_users
FROM user_records
GROUP BY carrier, marital_status
"""
)
val consoleStream = tranformation
.writeStream
.format("console")
.option("truncate", false)
.outputMode("complete")
.start()
consoleStream.awaitTermination()
}
}
我的输出只是:
18/11/30 15:40:31 INFO StreamExecution: Streaming query made progress: {
"id" : "9420f826-0daf-40c9-a427-e89ed42ee738",
"runId" : "991c9085-3425-4ea6-82af-4cef20007a66",
"name" : null,
"timestamp" : "2018-11-30T14:40:31.117Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 2,
"triggerExecution" : 2
},
"eventTime" : {
"watermark" : "1970-01-01T00:00:00.000Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[file:/structured-streamming-taskdataset/stream_in/*]",
"startOffset" : null,
"endOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6a62e7ef"
}
}
【问题讨论】:
标签: scala apache-spark