flume的整体基础框架如下所示:
#定义三大组件的名称
1.其中source是flume日志采集的起点,监控文件系统目录,其中比较常见的是Spooling Directory Source,来进行一个数据因为意外情况数据采集中断,恢复过后,从异常中断位置继续采集。
2.flume的utf配置是默认按照utf-8来进行配置的。
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
#关于配置channel ,channel是flume的中间数据缓存管道类似kafka的机制,可采用的方式是menmory channel,原因是数据量大,需要极大的数据吞吐量和速度。
一旦flume进程down掉,没有续点传递的机制,但是使用基于内存,吞吐率会变大。
# 配置source组件
agent1.sources.source1.type = spooldir
#agent1.sources.source1.spoolDir = /home/nginx/logs/logHistory
#agent1.sources.source1.spoolDir = /home/nginx/logs/logtest
agent1.sources.source1.spoolDir = /home/nginx/logs/logHistory
agent1.sources.source1.ignorePattern = ^error.*\.log$
#agent1.sources.source1.ignorePattern = ^(.)*\\.tmp$
#Agent1.sources.spooldir-source1.ignorePattern=^.*\.tmp$
agent1.sources.source1.fileHeader = false
#配置拦截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
#agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# 配置sink组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://192.168.100.21:9000/weblog/%Y%m%d/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
#batchSize:这个参数是采用exec source时,含义是一次读入channel的数据行数
#当采用spooling Directory source时粒度,也就是evnets(flume处理最小的处理数据单元数)
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#agent1.sinks.sink1.hdfs.fileSuffix = COMPLETED
#滚动生成的文件按大小生成
agent1.sinks.sink1.hdfs.rollSize = 135000000
#滚动生成的文件按行数生成
agent1.sinks.sink1.hdfs.rollCount = 1000000
#滚动生成的文件按时间生成
agent1.sinks.sink1.hdfs.rollInterval = 60
#开启滚动生成目录
agent1.sinks.sink1.hdfs.round = true
#以10为一梯度滚动生成
agent1.sinks.sink1.hdfs.roundValue = 10
#单位为分钟
agent1.sinks.sink1.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
使用缓冲内存的内存通道
#agent1.channels.channel1.type = memory #使用来缓存数据的地方,内存。
#agent1.channels.channel1.capacity = 500000 允许存储在channel的evenets的最大数量
#agent1.channels.channel1.transactionCapacity = 600 每次数据由channel缓存到sink传输的最大events的数量
#agent1.channels.channel1.keep-alive = 120
#agent1.channels.channel1.byteCapacity=23456 这个是该channel的内存大小,单位是byte。
agent1.channels.channel1.type = file
agent1.channels.channel1.checkpointDir = /home/flume/flume-1.9.0/access_checkpoint
agent1.channels.channel1.dataDirs = /home/flume/flume-1.9.0/access_data
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
sink组件的核心工作是把channel中的数据进行输出到特定的终端,比如hdfs,以及hbase,database,和avro等等.
因此对于sink的这块儿的核心优化是优化各个终端(hdfs,hbase,database,以及avro)的数据插入性能。
整体的架构师分析如下:
三个组件的顺序是不能去进行颠倒的,组件的数量是可以自己去定义的,一个flume agent就好比一个进程,一个source就可以类比为组件中一个组件,但是这里需要去注意的是可以有多个sink,也就是目标数据源,每个sink又可以是多个线程,(具体的参数是thread Poolsize)
JAVA的内存设计:
主要是通过更改conf/flume-env.sh文件来进行实现的。有引入JAVA OPTS这个目录进行相关的配置。
目前三个source ,channe,以及sink三个组件比较大的一个问题是,三个组件之间
互相传递数据速度是不一致的,这是需要去严格限制和摒除的,在大数据量的问题的时候
会影响到整个系统的稳定性。