【发布时间】:2019-08-20 02:05:58
【问题描述】:
我有以下代码使用结构化流读取和处理 Kafka 数据
object ETLTest {
case class record(value: String, topic: String)
def main(args: Array[String]): Unit = {
run();
}
def run(): Unit = {
val spark = SparkSession
.builder
.appName("Test JOB")
.master("local[*]")
.getOrCreate()
val kafkaStreamingDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("subscribe", "...")
.option("failOnDataLoss", "false")
.option("startingOffsets","earliest")
.load()
.selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")
val sdvWriter = new ForeachWriter[record] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(record: record) = {
println("record:: " + record)
}
def close(errorOrNull: Throwable): Unit = {}
}
val sdvDF = kafkaStreamingDF
.as[record]
.filter($"value".isNotNull)
// DOES NOT WORK
/*val query = sdvDF
.writeStream
.format("console")
.start()
.awaitTermination()*/
// WORKS
/*val query = sdvDF
.writeStream
.foreach(sdvWriter)
.start()
.awaitTermination()
*/
}
}
我从 IntellijIdea IDE 运行此代码,当我使用 foreach(sdvWriter) 时,我可以看到从 Kafka 消耗的记录,但是当我使用 .writeStream.format("console") 时,我看不到任何记录。我假设控制台写入流正在维护某种检查点,并假设它已经处理了所有记录。是这样吗?我在这里遗漏了什么明显的东西吗?
【问题讨论】:
标签: scala apache-spark spark-structured-streaming