Structured streaming是spark 2.0以后新增的用于实时处理的技术。与spark streaming不同的是,Structured streaming打开了数据源到数据落地之间的限制,它这两个端整合起来,形成真正的“流”,形成一张巨大的表。同时也正因为此特点,真正实现了exactly once语义。

传统的spark streaming处理流程

Structured streaming

在spark streaming中可能实现从数据源到计算的"exactly once",但在数据落地的时候,并不能。比如,数据处理完毕,写入redis的过程中,集群崩掉。那么重启后,这部份数据会丢掉或者重复消费。除非自己去实现。而在Structured streaming中都已经得到了较好的实现。

Structured streaming处理流程

Structured streaming

话不多说,先来个官网的例子,最直观的感受

 1 import org.apache.spark.sql.SparkSession
 2 import org.apache.spark.sql.streaming.ProcessingTime
 3 
 4 object structured{
 5 
 6   def main(args: Array[String]) {
 7     val spark = SparkSession
 8       .builder
 9       .appName("StructuredNetworkWordCount")
10       .master("local")
11       .getOrCreate()
12 
13     import spark.implicits._
14 
15     val ds1 = spark
16       .readStream
17       .format("kafka")
18       .option("kafka.bootstrap.servers", "master:9092")
19       .option("subscribe", "test2")
20       .load()
21 
22     System.setProperty("hadoop.home.dir", "\\hadoop-common-2.2.0-bin-master")
23     spark.sparkContext.setCheckpointDir("/chekpoint")
24 
25     val ds2 = ds1.selectExpr("CAST (value as STRING) ").as[String]
26 
27     val words = ds2.as[String].flatMap(_.split(" "))
28 
29     val wordCounts = words.groupBy("value").count()
30 
31     val query = words
32       .writeStream
33       .outputMode("append")
34       .format("console")
35       .trigger(ProcessingTime("11 seconds"))
36       .start()
37 
38     query.awaitTermination()
39   }
40 
41 }
View Code

相关文章:

  • 2021-12-02
  • 2022-12-23
  • 2021-08-16
  • 2021-12-10
  • 2021-07-26
  • 2021-06-03
  • 2021-12-12
猜你喜欢
  • 2021-11-08
  • 2019-12-22
  • 2022-01-09
  • 2021-05-19
  • 2021-08-09
相关资源
相似解决方案