1,编写FlumePushWordCount
运行程序,启动监听端口8866
2. cd /usr/share/flume,编写脚本
其中,在/usr/data.txt是一个空文件
写脚本data.sh
然后先运行
flume-ng agent -c conf -f conf/flumesparktest.conf -n agent -Dflume.root.logger=INFO,console
在运行
sh data.sh
此时会在idea中打印数据
//将spark-streaming-flume-sink_2.11-2.0.1.jar(本地库)
//scala-library-2.11.8.jar(linux下spark安装路径下的jars目录下)
//commons-lang3-3.3.2.jar(linux下spark安装路径下的jars目录下)
//放进flume的lib路径下
//重命名原来lib下的
//commons-lang-2.5.jar,scala-library-2.10.5.jar
//mv commons-lang-2.5.jar commons-lang-2.5.jarbak
//mv scala-library-2.10.5.jar scala-library-2.10.5.jarbak
object FlumePollwordCount {
def main(args: Array[String]): Unit = {
if (args.length!=2){
println(
“”"
|请输入
|1.要从哪个hostname上poll数据下来
|2.要从哪个port上poll数据下来
“”".stripMargin
)
System.exit(1)
}
val Array(hostname,port) = args
val conf = new SparkConf().setAppName("flume poll").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(10))
val flumePollStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt,StorageLevel.MEMORY_AND_DISK_SER)
flumePollStream.print()
ssc.start()
ssc.awaitTermination()
}
}
在/usr/share/flume/conf目录下,
cp flumesparktest.conf flumesparkpolltest.conf
修改其中的type和hostname(master的ip)
在flume目录下
./bin/flume-ng agent -c conf -f conf/flumesparkpolltest.conf -n agent -Dflume.root.logger=INFO,console
然后再运行
sh data.sh
在idea下运行程序