1.环境准备

下载2.12.X版本的scala,在添加到idea sdk

SparkStreaming读取kafka数据做实时计算

2.工程搭建

创建maven工程,pom文件中加入如下内容:

<dependencies>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.12</artifactId>

<version>3.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.12</artifactId>

<version>3.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>

<version>3.0.0</version>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<configuration>

<source>1.8</source>

<target>1.8</target>

</configuration>

</plugin>

</plugins>

</build>

3.代码编写

//初始化sparkStreaming上下文:

val conf = new SparkConf().setAppName("appName").setMaster("local")

val ssc = new StreamingContext(conf, Seconds(5))

//设置kafka连接参数

val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "ip:9092",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "test1",

"auto.offset.reset" -> "earliest",

"enable.auto.commit" -> (true: java.lang.Boolean)

)

val topics = Array("topicName")

//通过直连方式获取dstream

val stream = KafkaUtils.createDirectStream[String, String](

ssc,

PreferConsistent,

Subscribe[String, String](topics, kafkaParams)

)

做计算并且打印到控制台

val value=stream.map(_.value())

value.countByValue().print()

//开始计算,等待计算终止

ssc.start()

ssc.awaitTermination()

4运行验证

程序启动后,使用kafka命令行往kafka推送数据,在sparkstreaming获取数据间隔内,连续推送几个单词,

并在控制台看计算结果

比如连续推送

SparkStreaming读取kafka数据做实时计算

打印 计算结果

SparkStreaming读取kafka数据做实时计算

 

 

相关文章: