Flume Source、Channel处理器、拦截器和Channel选择之间的交互

 

  1. 接收事件;
  2. 处理事件;
  3. 将事件传递给拦截器链;
  4. 将每个事件传递给Channel选择器;
  5. 返回写入事件的Channel列表;
  6. 将所有事件写入每个必需的Channel,只有一个事务被打开;对于每个Channel,所有事件都写为事务的一部分;
  7. 利用可选Channel重复相同动作。

Flume本身不限制Agent中Source、Channel和Sink的数量。因此Flume Source可以接收事件,并可以通过配置将事件复制到多个目的地。这使得Source通过Channel处理器、拦截器和Channel选择器,写入数据到Channel成为可能。

每个Source都有自己的Channel处理器。每次Source将数据写入Channel,它是通过委派该任务到其Channel处理器来完成的。然后,Channel处理器将这些事件传到一个或多个Source配置的拦截器中。

拦截器是一段代码,可以基于某些它完成的处理来读取时间和修改或删除时间。基于某些标准,如正则表达式,拦截器可以用来删除事件,为事件添加新报头或移除现有的报头等每个Source可以配置成使用多个拦截器,按照配置中定义的顺序被调用,将拦截器的结果传递给链的下一个单元这就是所谓的责任链的设计模式。一旦拦截器处理完事件,拦截器链返回的事件列表传递到Channel列表,即通过Channel选择器为每个事件选择的Channel。

Source可以通过处理器-拦截器-选择路由器写入多个Channel。Channel选择器是决定每个事件必须写入到Source附带的哪个Channel的组件。因此拦截器可以用来插入或删除事件中的数据,这样Channel选择器可以应用一些条件在这些事件上,来决定事件必须写入哪些Channel。Channel选择器可以对事件应用任意过滤条件,来决定每个事件必须写入哪些Channel,以及哪些Channel是必需的或可选的。

写入到必需的Channel失败将会导致Channel处理器抛出ChannelException,表明Source必须重试该事件(实际上,所有的事件都在那个事务中),而未能写入可选Channel失败仅仅忽略它。一旦写出事件,处理器将会对Source指示成功状态,可能发送确认(ACK)给发送该事件的系统,并继续接受更多的事件。

1.  拦截器

拦截器(Interceptor)是简单插件式组件,设置在SourceSource写入数据的Channel之间。Source接收到的事件在写入到对应的Channel之前,拦截器都可以转换或删除这些事件。每个拦截器实例只处理同一个Source接收到的事件。拦截器可以基于任意标准或转换事件,但是拦截必须返回尽可能多(或尽可能少)的事件,如同原始传递过来的事件。

多个拦截器组成一个有序的拦截器链。在一个链条中,可以添加任意数量的拦截器去转换从单个Source中来的事件。Source将同一个事务的所有事件传递给Channel处理器,进而传递给拦截器链条,然后事件被传递给链条中的第一个拦截器。通过拦截器转换时间产生的一系列事件,传递到链条的下一个拦截器,以此类推。链条最后一个拦截器返回的最终事件列表写入到Channel中。

因为拦截器必须在事件写入Channel之前完成转换操作,只有当拦截器已成功转换事件后,RPC Source(和任何其他可能产生超时的Source)才会响应发送事件的客户端或Sink。因此,在拦截器中进行大量重量级的处理并不是一个好主意。如果拦截器中的处理时重量级的、耗时的,那么需要相应地调整超时时间属性

Flume配置文件中,所有拦截器通用的唯一配置参数是type参数,该参数必须是拦截器的别名或者Builder类的完全限定类名(FQCN),该Builder类用于创建拦截器。正如前面提到的,可以设置任意数量的拦截器连接到单个的Source。

拦截器是需要命名的组件,每个拦截器实例必须限定一个名字。为了给Source添加拦截器,需要列出Source应该连接的拦截器名字,这些拦截器就是Source应该连接到Source配置中interceptors参数的值代表的拦截器。原配置中以interceptors. 开头的、后面跟着拦截器名称和参数的所有值都传递给拦截器。

2.  Channel选择器

Channel选择器是决定Source接收的一个特定事件写入哪些Channel的组件。它们告知Channel处理器,然后将事件写入到每个Channel。

由于Flume并不是两阶提交(不会等所有事件都写入成功后再一起提交,而是写一个提交一个),事件被写入到一个Channel,然后在事件被写入到下一个Channel之前提交。如果写入一个Channel时出现故障,可能已经发生在其他Channel的相同事件的写入不能被回滚。当这样的故障发生时,Channel处理器抛出ChannelException并且事务失败。如果Source试图再次写入相同的事件(在大多数情况下,它会重试,只有类似SyslogExecSource不能重试,因为没有办法再次生成相同的数据),重复的事件将写入到Channel,而先前的提交实际上是成功的,这是在Flume管道发生重复的一种情况。

Channel选择器配置是通过Channel处理器完成的,虽然配置看起来像Source子组件的配置。传递到Channel选择器的所有参数作为Source的上下文中的参数使用selector后缀传递。对于每个Source,选择器通过使用一个配置参数type指定。Channel选择器可以指定一组Channel是必需的(required),另一组是可选的(optional)。

Flume内置两种Channel选择器:replicating和mutiplexing。如果Source的配置中没有指定选择器,那么会自动使用复制Channel选择器。

replicating Channel选择器复制每个事件到Sourcechannels参数所指定的所有Channel

multiplexing Channel选择器是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入的Channel,基于一个特定的事件头的值进行路由

相关文章: