搭建
环境
操作系统:centos6.5
Flume版本:1.7
三台虚拟机
Agent11:192.168.80.120
Agent12:192.168.80.121
Agent13:192.168.80.122
检查防火墙有没有关闭:service iptables status
根据设计图,配置conf/下的配置文件
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
Agent11和Agent12的配置文件一样。
#Agent11
agent11.channels =ch11
agent11.sources = src11
agent11.sinks = sink11
#agent11 SpoolingDirectory Source
#source
agent11.sources.src11.type= spooldir
agent11.sources.src11.spoolDir =/home/hadoop/data/flume/spooldir
agent11.sources.src11.fileHeader= true
agent11.sources.src11.deletePolicy=immediate
#default never Whento delete completed files: never or immediate 删除spooldir目录下的,完成的文件,后缀.completed
agent11.sources.src11.batchSize=1000
|
#default 100 |
Granularity at which to batch transfer to the channel |
agent11.sources.src11.channels=ch11
agent11.sources.src11.deserializer.maxLineLength=20480
#2048 Maximum numberof characters to include in a single event. If a line exceeds this length, itis truncated, and the remaining characters on the line will appear in a #subsequent event.
#agent11 FileChannel
#channel
agent11.channels.ch11.type= file
agent11.channels.ch11.checkpointDir= /home/hadoop/data/flume/checkpointDir
#The directory where checkpoint file will be stored
agent11.channels.ch11.dataDirs= /home/hadoop/data/flume/dataDirs
#Comma separated list of directories for storing log files. Usingmultiple directories on separate disks can improve file channel peformance
agent11.channels.ch11.capacity= 200000000 #200Mb
#DEFALUT:1000000….约等于…….1Mb Maximum capacity ofthe channel 单位是B
agent11.channels.ch11.keep-alive= 30
|
#default 3 …3s…. Amount of time (in sec) to wait for a put operation |
单位是S |
agent11.channels.ch11.write-timeout= 30
#flume1.7里面没有
agent11.channels.ch11.checkpoint-timeout=600
#flume1.7里面没有,但是有
#checkpointInterval default:30000 Amountof time (in millis) between checkpoints
# agent11 Sinks
#注(3)
agent11.sinks.sink11.channel= ch11
agent11.sinks.sink11.type= avro
agent11.sinks.sink11.request-timeout=30000
# connect toCollectorMainAgent
agent11.sinks.sink11.hostname= 192.168.80.122
agent11.sinks.sink11.port= 4444
#Agent
agent12.channels =ch12
agent12.sources = src12
agent12.sinks = sink12
#agent12 SpoolingDirectory Source
#注(1)
agent12.sources.src12.type= spooldir
agent12.sources.src12.spoolDir =/home/hadoop/data/flume/spooldir
agent12.sources.src12.fileHeader= true
agent12.sources.src12.deletePolicy=immediate
agent12.sources.src12.batchSize=1000
agent12.sources.src12.channels=ch12
agent12.sources.src12.deserializer.maxLineLength=20480
#agent12 FileChannel
#注(2)
agent12.channels.ch12.type= file
agent12.channels.ch12.checkpointDir= /home/hadoop/data/flume/checkpointDir
agent12.channels.ch12.dataDirs= /home/hadoop/data/flume/dataDirs
agent12.channels.ch12.capacity= 200000000
agent12.channels.ch12.keep-alive= 30
agent12.channels.ch12.write-timeout= 30
agent12.channels.ch12.checkpoint-timeout=600
# agent12 Sinks
#注(3)
agent12.sinks.sink12.channel= ch12
agent12.sinks.sink12.type= avro
agent12.sinks.sink12.request-timeout=30000
# connect toCollectorMainAgent
agent12.sinks.sink12.hostname= 192.168.80.122
agent12.sinks.sink12.port= 4445
Agent13配置文件如下:
#agent13
agent13.channels =ch13
agent13.sources = src13
agent13.sinks = sink13
#agent13 Avro Source
#注(4)
agent13.sources.src13.type= avro
agent13.sources.src13.channels= ch13
agent13.sources.src13.bind= 192.168.80.122
agent13.sources.src13.port= 4444
agent13.sources.src13.batchSize=100000
#agent13memoryChannel
agent13.channels.ch13.type= file
agent13.channels.ch13.checkpointDir= /home/hadoop/data/flume/checkpointDir
agent13.channels.ch13.dataDirs= /home/hadoop/data/flume/dataDirs
agent13.channels.ch13.capacity= 200000
agent13.channels.ch13.transactionCapacity= 200000
agent13.channels.ch13.keep-alive= 30
agent13.channels.ch13.write-timeout= 30
agent13.channels.ch13.checkpoint-timeout=600
##agent13 MemoryChannel
#agent13.channels.ch13.type= memory
#agent13.channels.ch13.capacity= 10000
#agent13.channels.ch13.transactionCapacity= 10000
#agent13.channels.ch13.byteCapacityBufferPercentage= 20
#agent13.channels.ch13.byteCapacity= 800000
#agent13 Sinks
#注(5)
agent13.sinks.sink13.channel=ch13
agent13.sinks.sink13.type= hdfs
#The component type name, needs to be hdfs
agent13.sinks.sink13.hdfs.useLocalTimeStamp= true
#defalut:false Use the localtime (instead of the timestamp from the event header) while replacing theescape sequences.
agent13.sinks.sink13.hdfs.rollSize=4000000
#File size to trigger roll, in bytes (0: never roll based on filesize)
agent13.sinks.sink13.hdfs.path= hdfs://cluster1/flume/%Y%m%d
#HDFS directory path (eg hdfs://namenode/flume/webdata/)
agent13.sinks.sink13.hdfs.filePrefix= FlumeData
#default:FlumeData #Name prefixed to files created by Flume inhdfs directory
agent13.sinks.sink13.hdfs.minBlockReplicas=1
#Specify minimum number of replicas per HDFS block. If not specified,it comes from the default Hadoop config in the classpath.
#但是为什么设置成1,之后就可以按照自己的配置生成hdfs文件?这个不懂
#agent13.sinks.sink13.hdfs.fileType=DataStream
#default:SequenceFile
#File format: currently SequenceFile, DataStream or CompressedStream(1)DataStream will not compress output file and please don’t set codeC(2)CompressedStream #requiresset hdfs.codeC with an available codeC
agent13.sinks.sink13.hdfs.writeFormat=Text
#Format for sequence file records. One of “Text” or “Writable” (thedefault).
agent13.sinks.sink13.hdfs.rollInterval=200
#Number of seconds to wait before rolling current file (0 = neverroll based on time interval)
#agent13.sinks.sink13.hdfs.rollSize=0
#File size to trigger roll, in bytes (0: never roll based on filesize)
agent13.sinks.sink13.hdfs.rollCount=0
#Number of events written to file before it rolled (0 = never rollbased on number of events)
agent13.sinks.sink13.hdfs.idleTimeout=0
#Timeout after which inactive files get closed (0 = disable automaticclosing of idle files)
agent13.sinks.sink13.hdfs.callTimeout 10000
#Number ofmilliseconds allowed for HDFS operations, such as open, write, flush, close.This number should be increased if many HDFS timeout operations are occurring.