Structured streaming是spark 2.0以后新增的用于实时处理的技术。与spark streaming不同的是,Structured streaming打开了数据源到数据落地之间的限制,它这两个端整合起来,形成真正的“流”,形成一张巨大的表。同时也正因为此特点,真正实现了exactly once语义。
传统的spark streaming处理流程
在spark streaming中可能实现从数据源到计算的"exactly once",但在数据落地的时候,并不能。比如,数据处理完毕,写入redis的过程中,集群崩掉。那么重启后,这部份数据会丢掉或者重复消费。除非自己去实现。而在Structured streaming中都已经得到了较好的实现。
Structured streaming处理流程
话不多说,先来个官网的例子,最直观的感受
1 import org.apache.spark.sql.SparkSession 2 import org.apache.spark.sql.streaming.ProcessingTime 3 4 object structured{ 5 6 def main(args: Array[String]) { 7 val spark = SparkSession 8 .builder 9 .appName("StructuredNetworkWordCount") 10 .master("local") 11 .getOrCreate() 12 13 import spark.implicits._ 14 15 val ds1 = spark 16 .readStream 17 .format("kafka") 18 .option("kafka.bootstrap.servers", "master:9092") 19 .option("subscribe", "test2") 20 .load() 21 22 System.setProperty("hadoop.home.dir", "\\hadoop-common-2.2.0-bin-master") 23 spark.sparkContext.setCheckpointDir("/chekpoint") 24 25 val ds2 = ds1.selectExpr("CAST (value as STRING) ").as[String] 26 27 val words = ds2.as[String].flatMap(_.split(" ")) 28 29 val wordCounts = words.groupBy("value").count() 30 31 val query = words 32 .writeStream 33 .outputMode("append") 34 .format("console") 35 .trigger(ProcessingTime("11 seconds")) 36 .start() 37 38 query.awaitTermination() 39 } 40 41 }