Flume
flume收集数据资源,集中存储,具有分布式,高可用,配置工具。原理:基于数据流
Apache Flume是一个分布式的、可靠的、易用的系统,可以有效地将来自很多不同源系统的大量日志数据收集、汇总或者转移到一个数据中心存储。
架构
Flume的架构主要有一下几个核心概念:
· Event:一个数据单元,带有一个可选的消息头
· Flow:Event从源点到达目的点的迁移的抽象
· Client:操作位于源点处的Event,将其发送到Flume Agent
· Agent:一个独立的Flume(JVM)进程,包含组件Source、Channel、Sink
· Source:用来消费传递到该组件的Event
· Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
· Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent
数据流模型
多个agent数据流
数据流复用
数据流合并
配置文件
#分布各个集群配置
#main infos
agent1.sources = spoolSource
agent1.channels = memoryChannel
agent1.sinks = avroSink
# source
agent1.sources.spoolSource.type = spooldir
agent1.sources.spoolSource.spoolDir =/tmp/test/flume/data
agent1.sources.spoolSource.channels = memoryChannel
agent1.sources.spoolSource.deserializer.maxLineLength =50000
# channel
agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 10000
agent1.channels.memoryChannel.transactionCapacity = 10000
agent1.channels.memoryChannel.byteCapacityBufferPercentage= 20
agent1.channels.memoryChannel.byteCapacity = 0
# Avro Sink
agent1.sinks.avroSink.type = avro
agent1.sinks.avroSink.hostname = 192.168.50.8
agent1.sinks.avroSink.port = 22222
agent1.sinks.avroSink.channel = memoryChannel
#直接入存储中心时配置如下:
#配置hbase sink2
agent1.sinks.sink2.type = hbase
agent1.sinks.sink2.channel = channel1
agent1.sinks.sink2.table = hmbbs
agent1.sinks.sink2.columnFamily = cf
agent1.sinks.sink2.serializer =flume.HmbbsHbaseEventSerializer
agent1.sinks.sink2.serializer.suffix = timestamp
agent1.sinks.sink2.serializer =org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
# 配置hdfs sink
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.channel = ch2
agent1.sinks.sink1.hdfs.path =hdfs://hadoop0:9000/flume/%Y-%m-%d/
agent1.sinks.sink1.hdfs.rollInterval=1
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
#总集群配置
# main infos
agentpool.sources = avroSource
agentpool.channels = memoryChannel
agentpool.sinks = sparkSink
# Avro Source
agentpool.sources.avroSource.type = avro
agentpool.sources.avroSource.bind = 192.168.50.8
agentpool.sources.avroSource.port = 22222
#agentpool.sources.avroSource.threads = 3
agentpool.sources.avroSource.channels = memoryChannel
# channel
agentpool.channels.memoryChannel.type = memory
agentpool.channels.memoryChannel.capacity = 1000000
agentpool.channels.memoryChannel.transactionCapacity =1000000
agentpool.channels.memoryChannel.byteCapacityBufferPercentage= 20
agentpool.channels.memoryChannel.byteCapacity = 0
# Spark Sink (这里的主机和端口与spark streaming程序中的相同)
agentpool.sinks.sparkSink.type =org.apache.spark.streaming.flume.sink.SparkSink
agentpool.sinks.sparkSink.hostname = 192.168.50.8
agentpool.sinks.sparkSink.port = 55555
agentpool.sinks.sparkSink.channel = memoryChannel
命令
#启动代理的脚本是flume-ng agent,需要指定agent name、配置目录、配置文件
-n 指定agent名称
-c 指定配置文件目录
-f 指定配置文件
-Dflume.root.logger=DEBUG,console
bin/flume-ng agent –n agent1 –c conf –f conf/example–Dflume.root.logger=DEBUG,console
nohup flume-ng agent -c conf/ -fconf/flume-sparkstreaming.conf -n agentpool
–Dflume.root.logger=DEBUG,console
Flume Source 支持的类型:
|
Source类型 |
说明 |
|
Avro Source |
支持Avro协议(实际上是Avro RPC),内置支持 |
|
Thrift Source |
支持Thrift协议,内置支持 |
|
Exec Source |
基于Unix的command在标准输出上生产数据 |
|
JMS Source |
从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过 |
|
Spooling Directory Source |
监控指定目录内数据变更 |
|
Twitter 1% firehose Source |
通过API持续下载Twitter数据,试验性质 |
|
Netcat Source |
监控某个端口,将流经端口的每一个文本行数据作为Event输入 |
|
Sequence Generator Source |
序列生成器数据源,生产序列数据 |
|
Syslog Sources |
读取syslog数据,产生Event,支持UDP和TCP两种协议 |
|
HTTP Source |
基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式 |
|
Legacy Sources |
兼容老的Flume OG中Source(0.9.x版本) |
Flume Channel 支持的类型:
|
Channel类型 |
说明 |
|
Memory Channel |
Event数据存储在内存中 |
|
JDBC Channel |
Event数据存储在持久化存储中,当前Flume Channel内置支持Derby |
|
File Channel |
Event数据存储在磁盘文件中 |
|
Spillable Memory Channel |
Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用) |
|
Pseudo Transaction Channel |
测试用途 |
|
Custom Channel |
自定义Channel实现 |
Flume Sink支持的类型
|
Sink类型 |
说明 |
|
HDFS Sink |
数据写入HDFS |
|
Logger Sink |
数据写入日志文件 |
|
Avro Sink |
数据被转换成Avro Event,然后发送到配置的RPC端口上 |
|
Thrift Sink |
数据被转换成Thrift Event,然后发送到配置的RPC端口上 |
|
IRC Sink |
数据在IRC上进行回放 |
|
File Roll Sink |
存储数据到本地文件系统 |
|
Null Sink |
丢弃到所有数据 |
|
HBase Sink |
数据写入HBase数据库 |
|
Morphline Solr Sink |
数据发送到Solr搜索服务器(集群) |
|
ElasticSearch Sink |
数据发送到Elastic Search搜索服务器(集群) |
|
Kite Dataset Sink |
写数据到Kite Dataset,试验性质的 |
|
Custom Sink |
自定义Sink实现 |