- 接收事件;
- 处理事件;
- 将事件传递给拦截器链;
- 将每个事件传递给Channel选择器;
- 返回写入事件的Channel列表;
- 将所有事件写入每个必需的Channel,只有一个事务被打开;对于每个Channel,所有事件都写为事务的一部分;
- 利用可选Channel重复相同动作。
Flume本身不限制Agent中Source、Channel和Sink的数量。因此Flume Source可以接收事件,并可以通过配置将事件复制到多个目的地。这使得Source通过Channel处理器、拦截器和Channel选择器,写入数据到Channel成为可能。
每个Source都有自己的Channel处理器。每次Source将数据写入Channel,它是通过委派该任务到其Channel处理器来完成的。然后,Channel处理器将这些事件传到一个或多个Source配置的拦截器中。
拦截器是一段代码,可以基于某些它完成的处理来读取时间和修改或删除时间。基于某些标准,如正则表达式,拦截器可以用来删除事件,为事件添加新报头或移除现有的报头等。每个Source可以配置成使用多个拦截器,按照配置中定义的顺序被调用,将拦截器的结果传递给链的下一个单元。这就是所谓的责任链的设计模式。一旦拦截器处理完事件,拦截器链返回的事件列表传递到Channel列表,即通过Channel选择器为每个事件选择的Channel。
Source可以通过处理器-拦截器-选择路由器写入多个Channel。Channel选择器是决定每个事件必须写入到Source附带的哪个Channel的组件。因此拦截器可以用来插入或删除事件中的数据,这样Channel选择器可以应用一些条件在这些事件上,来决定事件必须写入哪些Channel。Channel选择器可以对事件应用任意过滤条件,来决定每个事件必须写入哪些Channel,以及哪些Channel是必需的或可选的。
写入到必需的Channel失败将会导致Channel处理器抛出ChannelException,表明Source必须重试该事件(实际上,所有的事件都在那个事务中),而未能写入可选Channel失败仅仅忽略它。一旦写出事件,处理器将会对Source指示成功状态,可能发送确认(ACK)给发送该事件的系统,并继续接受更多的事件。
1. 拦截器
拦截器(Interceptor)是简单插件式组件,设置在Source和Source写入数据的Channel之间。Source接收到的事件在写入到对应的Channel之前,拦截器都可以转换或删除这些事件。每个拦截器实例只处理同一个Source接收到的事件。拦截器可以基于任意标准或转换事件,但是拦截必须返回尽可能多(或尽可能少)的事件,如同原始传递过来的事件。
多个拦截器组成一个有序的拦截器链。在一个链条中,可以添加任意数量的拦截器去转换从单个Source中来的事件。Source将同一个事务的所有事件传递给Channel处理器,进而传递给拦截器链条,然后事件被传递给链条中的第一个拦截器。通过拦截器转换时间产生的一系列事件,传递到链条的下一个拦截器,以此类推。链条最后一个拦截器返回的最终事件列表写入到Channel中。
因为拦截器必须在事件写入Channel之前完成转换操作,只有当拦截器已成功转换事件后,RPC Source(和任何其他可能产生超时的Source)才会响应发送事件的客户端或Sink。因此,在拦截器中进行大量重量级的处理并不是一个好主意。如果拦截器中的处理时重量级的、耗时的,那么需要相应地调整超时时间属性。
Flume配置文件中,所有拦截器通用的唯一配置参数是type参数,该参数必须是拦截器的别名或者Builder类的完全限定类名(FQCN),该Builder类用于创建拦截器。正如前面提到的,可以设置任意数量的拦截器连接到单个的Source。
拦截器是需要命名的组件,每个拦截器实例必须限定一个名字。为了给Source添加拦截器,需要列出Source应该连接的拦截器名字,这些拦截器就是Source应该连接到Source配置中interceptors参数的值代表的拦截器。原配置中以interceptors. 开头的、后面跟着拦截器名称和参数的所有值都传递给拦截器。
2. Channel选择器
Channel选择器是决定Source接收的一个特定事件写入哪些Channel的组件。它们告知Channel处理器,然后将事件写入到每个Channel。
由于Flume并不是两阶提交(不会等所有事件都写入成功后再一起提交,而是写一个提交一个),事件被写入到一个Channel,然后在事件被写入到下一个Channel之前提交。如果写入一个Channel时出现故障,可能已经发生在其他Channel的相同事件的写入不能被回滚。当这样的故障发生时,Channel处理器抛出ChannelException并且事务失败。如果Source试图再次写入相同的事件(在大多数情况下,它会重试,只有类似Syslog、Exec等Source不能重试,因为没有办法再次生成相同的数据),重复的事件将写入到Channel,而先前的提交实际上是成功的,这是在Flume管道发生重复的一种情况。
Channel选择器配置是通过Channel处理器完成的,虽然配置看起来像Source子组件的配置。传递到Channel选择器的所有参数作为Source的上下文中的参数使用selector后缀传递。对于每个Source,选择器通过使用一个配置参数type指定。Channel选择器可以指定一组Channel是必需的(required),另一组是可选的(optional)。
Flume内置两种Channel选择器:replicating和mutiplexing。如果Source的配置中没有指定选择器,那么会自动使用复制Channel选择器。
replicating Channel选择器复制每个事件到Source的channels参数所指定的所有Channel中。
multiplexing Channel选择器是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入的Channel,基于一个特定的事件头的值进行路由。