1,编写FlumePushWordCount
spark 连接flume传输数据
spark 连接flume传输数据
运行程序,启动监听端口8866
2. cd /usr/share/flume,编写脚本
spark 连接flume传输数据
其中,在/usr/data.txt是一个空文件
写脚本data.sh
spark 连接flume传输数据
然后先运行
flume-ng agent -c conf -f conf/flumesparktest.conf -n agent -Dflume.root.logger=INFO,console
在运行
sh data.sh
此时会在idea中打印数据

//将spark-streaming-flume-sink_2.11-2.0.1.jar(本地库)
//scala-library-2.11.8.jar(linux下spark安装路径下的jars目录下)
//commons-lang3-3.3.2.jar(linux下spark安装路径下的jars目录下)
//放进flume的lib路径下
//重命名原来lib下的
//commons-lang-2.5.jar,scala-library-2.10.5.jar
//mv commons-lang-2.5.jar commons-lang-2.5.jarbak
//mv scala-library-2.10.5.jar scala-library-2.10.5.jarbak

object FlumePollwordCount {

def main(args: Array[String]): Unit = {
if (args.length!=2){
println(
“”"
|请输入
|1.要从哪个hostname上poll数据下来
|2.要从哪个port上poll数据下来
“”".stripMargin
)
System.exit(1)
}

val Array(hostname,port) = args
val conf = new SparkConf().setAppName("flume poll").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(10))
val flumePollStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt,StorageLevel.MEMORY_AND_DISK_SER)

flumePollStream.print()

ssc.start()
ssc.awaitTermination()

}

}

在/usr/share/flume/conf目录下,
cp flumesparktest.conf flumesparkpolltest.conf
修改其中的type和hostname(master的ip)
spark 连接flume传输数据
在flume目录下
./bin/flume-ng agent -c conf -f conf/flumesparkpolltest.conf -n agent -Dflume.root.logger=INFO,console

然后再运行
sh data.sh

在idea下运行程序

相关文章: