目录:
- Flume核心组件介绍:source、channel、sink
- Flume可靠性
- event基本概念
- Flume拦截器简介和基本使用
- Flume选择器简介和基本使用
- Flume和kafka整合
- Flume故障转移
- Flume负载均衡
一.flume核心组件介绍
- source:
source fan-out:指的是一个source可以指向多个channel;
一个source可以指向多个channel,
一个sink只能被一个channel指向。
source类型:https://www.cnblogs.com/swordfall/p/8254271.html - channel
- 介绍
- 中转Event的一个临时存储,保存有source组件传递过来的Event。
- 常用的channel分别有:MemeoryChannel、File Channel、JDBC Channel(现在很少使用,只支持DB储存)
- File Channel是一个持久化的隧道(channel),它持久化所有的事件,并将其存储到磁盘中,不会造成数据丢失。
- Memory Channel是一个不稳定的隧道,其原因是由于它在内存中储存所有的时间。内存的空间受到RAM大小的限制。(关机、停电、内存溢出都会造成数据丢失)
- 类型
- 测试:File Channel
- 在配置文件中修改参数:在官网有实例:http://flume.apache.org/documentation.html
- 启动flume
- 发送数据,使用Java的方式
- 在对应的数据储存目录查看数据
- 在配置文件中修改参数:在官网有实例:http://flume.apache.org/documentation.html
- 介绍
- sink
- 介绍
- 从Channel中读取并移除Event,将Event传递相应的目的地。
- 可以向文件系统、数据库、hadoop存储数据。
在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据;在日志较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析
- 类型
- 测试1:HDFS Sink
- 开启hdfs
- 修改配置文件,在官网有实例:http://flume.apache.org/documentation.html
- 启动flume
- 在hdfs中查看
- 开启hdfs
- 测试2:两个agent节点测试:那么第一个的sink和其相连的source的type方式需要对应才能连接,如图
测试步骤- 在第一个agent的配置文件中配置sink,在官网有实例:http://flume.apache.org/documentation.html
- 在第二个agent的配置文件中配置与第一个相连的source,在官网有实例:http://flume.apache.org/documentation.html
- 开启第一个agent
- 开启第二个agent
- 在第一个agent发送数据
- 查看:在hdfs中可以查看第一个agent发送的数据
- 在第一个agent的配置文件中配置sink,在官网有实例:http://flume.apache.org/documentation.html
- 测试3:将数据储存到kafka
- 开启zookeeper
- 开启kfaka
- 修改flume配置文件,在官网有实例:http://flume.apache.org/documentation.html
- 开启flume
- 测试,flume发送数据,会在kafka消费者看到消费的数据
- 介绍
二.flume可靠性
1.flume可靠性的原因
2.flume可靠性说明
- Sink必须在Event被存入Channel后,或者,已经被传到到下一站agent里,又或者,已经被存到外部数据目的地之后,才能把Event从Channel中remove掉,这样数据流里的Event无论是在一个agent里还是多个agent之间流转,就能保证可靠,因为以上的事务保证了Event会被成功存储起来。
- 而且Channel的多种实现在可恢复性上有不同的保证,也保证了event不同程度的可靠性。比如Flume支持在本地保存一份文件Channel作为备份,而memory channel将event放在内存queue里,速度块,但丢失的会无法恢复。
3.示例
三.Event基本概念
- Event是Flume数据传输的基本单元。Flume以Event的形式将数据从源头传送到最终目的。Event由可选的Headers和载有数据的一个byte数据构成。
- 载有的数据对flume是不透明的。
- Headers是容纳了key-value字符串对的无序集合,key在集合内是唯一的。
- Headers可以在上下文路由中使用扩展。
public interface Event{ public Map<String,String> getHeaders(); public void setHeaders(Map<String,String> headers); public byte[] getBody(); public void setBody(Byte[] body); }
四.Flume拦截器简介和基本使用
-
简介
说明:链式编程,类似拉链一样 -
分类
- 拦截过程
- avro - source中配置ip和ip通过规则拦截:官网查看
- Host Interceptor
测试步骤:- 创建配置文件
- 重新启动flume
第一步:杀死进程
第二步:再次启动 - 测试
结果:另起一个窗口
- 创建配置文件
- Regex Filtering Interceptor
测试步骤:- 创建配置文件:没有带{}的数据不能通过
- 重新启动flume
- 测试
1.发送
2.查看:只接收到了{hello},java 就被拦截了
- 创建配置文件:没有带{}的数据不能通过
五.Flume选择器简介和基本使用
1.简介
2.说明:
3.测试:复制
步骤:
- 创建配置文件:配置两个channel、两个sink
文件名: - 启动
- 测试: 发送数据,然后查看
4.测试:分发
组件解析:
- 创建配置文件
- 启动
- 测试:发送数据、查看数据储存路径
六.Flume和kafka整合
七.Flume故障转移
1.故障转移概念
2.解析:
3.测试步骤:
- 创建3个agent,所以对应3个配置文件
avro01.conf
avro02.conf
netcat_avro.conf,官网实例如下
具体如下:第16行,权重最大,优先级越高(也就是优先接收数据) - 启动:打开3个窗口分别启动agent
- 测试
第一步:在配置文件为netcat_avro.conf的flume服务器中发送数据
第二步:可以在配置文件为avro02.conf的flume服务器中看到接收的数据
第三步:关闭配置文件为avro02.conf的flume服务器
找到此服务的进程号,kill
第四步:在配置文件为netcat_avro.conf的flume服务器中继续发送数据
第五步:可以在在配置文件为avro01.conf的flume服务器中看到接收的数据
八.负载均衡
1.负载均衡概念
2.解析:
3.测试:在故障转移的实例上测试,只需要修改netcat_avro.conf配置文件即可,同参考官方文档
- 修改netcat_avro.conf配置文件
可参考官方文档:
具体参数: - 打开3个xshell窗口分别启动3个配置文件对应的flume
- 测试:netcat_avro.conf配置文件的flume服务器发送多条数据,可以在01,02中都看到部分数据