flume的整体基础框架如下所示:1.flume相关配置以及配置含义

#定义三大组件的名称

1.其中source是flume日志采集的起点,监控文件系统目录,其中比较常见的是Spooling Directory Source,来进行一个数据因为意外情况数据采集中断,恢复过后,从异常中断位置继续采集。

2.flume的utf配置是默认按照utf-8来进行配置的。


agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

#关于配置channel ,channel是flume的中间数据缓存管道类似kafka的机制,可采用的方式是menmory channel,原因是数据量大,需要极大的数据吞吐量和速度。

一旦flume进程down掉,没有续点传递的机制,但是使用基于内存,吞吐率会变大。

# 配置source组件
agent1.sources.source1.type = spooldir
#agent1.sources.source1.spoolDir = /home/nginx/logs/logHistory
#agent1.sources.source1.spoolDir = /home/nginx/logs/logtest
agent1.sources.source1.spoolDir = /home/nginx/logs/logHistory
agent1.sources.source1.ignorePattern = ^error.*\.log$
#agent1.sources.source1.ignorePattern = ^(.)*\\.tmp$
#Agent1.sources.spooldir-source1.ignorePattern=^.*\.tmp$
agent1.sources.source1.fileHeader = false
#配置拦截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
#agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# 配置sink组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://192.168.100.21:9000/weblog/%Y%m%d/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100

#batchSize:这个参数是采用exec source时,含义是一次读入channel的数据行数

#当采用spooling Directory source时粒度,也就是evnets(flume处理最小的处理数据单元数)
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#agent1.sinks.sink1.hdfs.fileSuffix = COMPLETED
#滚动生成的文件按大小生成
agent1.sinks.sink1.hdfs.rollSize = 135000000
#滚动生成的文件按行数生成
agent1.sinks.sink1.hdfs.rollCount = 1000000
#滚动生成的文件按时间生成
agent1.sinks.sink1.hdfs.rollInterval = 60
#开启滚动生成目录
agent1.sinks.sink1.hdfs.round = true
#以10为一梯度滚动生成
agent1.sinks.sink1.hdfs.roundValue = 10
#单位为分钟
agent1.sinks.sink1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory

使用缓冲内存的内存通道
#agent1.channels.channel1.type = memory #使用来缓存数据的地方,内存。
#agent1.channels.channel1.capacity = 500000 允许存储在channel的evenets的最大数量
#agent1.channels.channel1.transactionCapacity = 600 每次数据由channel缓存到sink传输的最大events的数量
#agent1.channels.channel1.keep-alive = 120 

#agent1.channels.channel1.byteCapacity=23456 这个是该channel的内存大小,单位是byte。

agent1.channels.channel1.type = file
agent1.channels.channel1.checkpointDir = /home/flume/flume-1.9.0/access_checkpoint
agent1.channels.channel1.dataDirs = /home/flume/flume-1.9.0/access_data

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

sink组件的核心工作是把channel中的数据进行输出到特定的终端,比如hdfs,以及hbase,database,和avro等等.

因此对于sink的这块儿的核心优化是优化各个终端(hdfs,hbase,database,以及avro)的数据插入性能。

整体的架构师分析如下:

三个组件的顺序是不能去进行颠倒的,组件的数量是可以自己去定义的,一个flume agent就好比一个进程,一个source就可以类比为组件中一个组件,但是这里需要去注意的是可以有多个sink,也就是目标数据源,每个sink又可以是多个线程,(具体的参数是thread Poolsize)

JAVA的内存设计:

主要是通过更改conf/flume-env.sh文件来进行实现的。有引入JAVA OPTS这个目录进行相关的配置。

1.flume相关配置以及配置含义

 

目前三个source ,channe,以及sink三个组件比较大的一个问题是,三个组件之间

互相传递数据速度是不一致的,这是需要去严格限制和摒除的,在大数据量的问题的时候

会影响到整个系统的稳定性。

 

1.flume相关配置以及配置含义

 

 

 

 


 

相关文章: