◆ 构建第一个Streaming程序: (wordCount)
◆ Spark Streaming 程序最好以使用Maven或者sbt编译出来的独立应用的形式运行。
◆ 准备工作:
1.引入Spark Streaming的jar
2.scala流计算import声明
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
1.初始化StreamingContext对象
//创建一个本地StreamingContext两个工作线程和批间隔1秒。
val conf = new SparkConf()
conf.setMaster(“local[2]")
conf.setAppName(“ NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
2.获取DStream对象
//创建一个连接到主机名的DStream,像localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
3.操作DStream对象
//将每一行接收到的数据通过空格分割成单词
val words = lines.flatMap(_.split(" “))
//导入StreamingContext中的隐式转换
import org.apache.spark.streaming.StreamingContext._
// 对每一批次的单词进行转化求和
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 每个批次中默认打印前十个元素到控制台
wordCounts.print()
4.启动流处理程序
ssc.start// 开始计算
ssc.awaitTermination() // 等待计算终止
ssc.stop() //结束应用
启动网络端口,模拟发送数据
1.借助于nc命令,手动输入数据
Linux/Mac :nc
Windows:cat
nc -lk 9999
2.借助于代码,编写一个模拟数据发生器
package com.briup.streaming import java.io.PrintWriter import java.net.ServerSocket import scala.io.Source object MassageServer { // 定义随机获取整数的方法 def index(length: Int) = { import java.util.Random val rdm = new Random rdm.nextInt(length) } def main(args: Array[String]) { println("模拟数据器启动!!!") // 获取指定文件总的行数 val filename ="Spark/ihaveadream.txt"; val lines = Source.fromFile(filename).getLines.toList val filerow = lines.length // 指定监听某端口,当外部程序请求时建立连接 val serversocket = new ServerSocket(9999); while (true) { //监听9999端口,获取socket对象 val socket = serversocket.accept() // println(socket) new Thread() { override def run = { println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(1000) // 当该端口接受请求时,随机获取某行数据发送给对方 val content = lines(index(filerow)) println (content) out.write(content + '\n') out.flush() } socket.close() } }.start() } } }