1.环境准备
下载2.12.X版本的scala,在添加到idea sdk
2.工程搭建
创建maven工程,pom文件中加入如下内容:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
3.代码编写
//初始化sparkStreaming上下文:
val conf = new SparkConf().setAppName("appName").setMaster("local")
val ssc = new StreamingContext(conf, Seconds(5))
//设置kafka连接参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "ip:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val topics = Array("topicName")
//通过直连方式获取dstream
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
做计算并且打印到控制台
val value=stream.map(_.value())
value.countByValue().print()
//开始计算,等待计算终止
ssc.start()
ssc.awaitTermination()
4运行验证
程序启动后,使用kafka命令行往kafka推送数据,在sparkstreaming获取数据间隔内,连续推送几个单词,
并在控制台看计算结果
比如连续推送
打印 计算结果