RocketMQ支持表达式过滤与类过滤两种模式,其中表达式又分为TAG和SQL92。类过滤模式允许提交一个过滤类到FilterServer,消息消费者从FilterServer拉取消息,消息经过FilterServer时会执行过滤逻辑。

基于表达式的消息过滤

  消息发送者在消息发送时如果设置了消息的tags属性,存储在消息属性中,先存储在CommitLog文件中,然后转发到消息消费队列,消息消费队列会用8个字节存储消息tag的hashcode,之所以不直接存储tag字符串,是因为将ConumeQueue设计为定长结构,加快消息消费的加载性能。
  RocketMQ基于表达式的消息过滤是在订阅时做过滤。在Broker端拉取消息时,遍历ConsumeQueue,只对比消息tag的hashcode,如果匹配则返回,否则忽略该消息。Consume在收到消息后,同样需要先对消息进行过滤,只是此时比较的是消息tag的值而不再是hashcode

Step1:消费者订阅消息主题与消息过滤表达式。构建订阅信息subscriptionData并加入到RebalanceImpl进行消息队列负载。
  subscriptionData的核心属性:
  1)String SUB_ALL:过滤模式,默认为全匹配。
  2)boolean classFilterMode:是否是类过滤模式,默认为false。
  3)String topic:消息主题名称。
  4)String subString:消息过滤表达式,多个用双竖线隔开,例如“TAGA||TAGB”。
  5)Set<String> tagsSet:消息过滤tag集合,消费端过滤时进行消息过滤的依据。
  6)Set<String> codeSet:消息过滤tag hashcode集合。
  7)String expressionType:过滤类型,TAG或SQL92。
Step2:根据订阅消息构建消息拉取标记。根据主题、消息过滤表达式构建订阅消息实体。构建消息过滤对象。
Step3:根据偏移量拉取消息后,首先根据ConsumeQueue条目进行消息过滤,如果不匹配则直接跳过该条消息,继续拉取下一条消息。
Step4:如果消息根据ConsumeQueue条目通过过滤,则需要从CommitLog文件中加载整个消息体,然后根据属性进行过滤。基于TAG模式,根据ConsumeQueue进行消息过滤时只对比tag的hashcode,所以基于TAG模式消息过滤,还需要在消息消费端对消息tag进行精确匹配。

  从消息拉取流程知道,消息拉取线程PullMessageService默认会使用异步方式从服务器拉取消息,如果消息过滤模式为TAG模式,并且订阅TAG集合不为空,则对消息的tag进行判断,如果集合中包含消息的TAG则返回给消费者消费,否则跳过。

 

消息过滤FilterServer

ClassFilter运行机制

  基于类模式过滤是指在 Broker 端运行1个或多个消息过滤服务器(FilterServer), RocketMQ 允许消息消费者自定义消息过滤实现类并将其代码上传到 FilterServer 上,消息消费者向 FilterServer 拉取消息,FilterServer将消息消费者的拉取命令转发到 Broker,然后对返回的消息执行消息过滤逻辑,最终将消息返回给消费端。

RocketMQ:(5) 消息过滤机制

1)Broker 进程所在的服务器会启动多个 FilterServer 进程。
2)消费者在订阅消息主题时会上传一个自定义的消息过滤实现类,FilterServer 加载并实例化。
3)消息消费者(Consume)向 FilterServer 发送消息拉取请求,FilterServer 接收到消息消费者消息拉取请求后,FilterServer 将消息拉取请求转发给 Broker,Broker 返回消息后在 FilterServer 端执行消息过滤逻辑,然后返回符合订阅信息的消息给消息消费者进行消费。

FilterServer 注册

  FilterServer在启动时会创建一个定时调度任务,每隔10s向Broker注册自己。
  Step1:FilterServer从配置文件中获取Broker地址,然后将FilterServer所在机器的IP与监听端口发送到Broker服务器。
  Step2:FilterServer与Broker通过心跳维持FilterServer在Broker端的注册,同样在Broker每隔10s扫描一下该注册表,如果30s内未收到FilterServer的注册信息,将关闭Broker与FilterServer的连接。Broker为了避免Broker端FilterServer的异常退出导致FilterServer进程越来越少,同样提供一个定时任务每30s检测一下当前存活的FilterServer进程个数。
  经过上面的步骤,Broker上已经保存了FilterServer的信息。那么NameServer中关于Broker的filterServer信息是如何从消息服务器(Broker)传输到NameServer的呢?Broker通过与所有NameServer的心跳包向NameServer注册Broker上存储的FilterServer列表,指引消息消费者正确从FilterServer上拉取消息。Brokers每30s向所有NameServer发送心跳包,心跳包中包含了集群名称、Broker名称、Broker地址、BrokerId、haServer地址、topic配置、过滤服务器列表等。

类过滤模式订阅机制

  RocketMQ 通过DefaultMQPushConsumerimpl#subscribe方法来实现基于类模式的消息过滤,其参数分别代表消费组订阅的消息主题、类过滤全路径名、类过滤源代码字符串 。
  Step1:构建订阅信息,然后将该订阅信息添加到 Rebalancelmpl 中,其主要目标是Rebalancelmpl会对订阅信息表中的主题进行消息队列的负载,创建消息拉取任务,以便PullMessageService 线程拉取消息 。
  Step2:定时将消息端订阅信息中的类过滤模式的过滤类源码上传到 FilterServer。
  Step3:根据订阅的主题获取该主题的路由信息,如果该主题路由信息中的FilterServer缓存表不为空,则需要将过滤类发送到FilterServer上
  Step4:遍历主题路由表中的 fiIterServerTable,向缓存中所有的 FilterServer 上传消息过滤代码 。
  Step5:FilterServer 端处理 FilterClass上传并将其源码编译的实现为FilterClassManager 。
  Step6:根据消息消费组与主题名称构建 filterClasTable 缓存 key,从缓存表中尝试获取过滤类型信息FilterClasslnfo。如果缓存表中不包含 FilterClasslnfo 则表示第一次注册,设置 registerNew 为true;如果 FilterClasslnfo不为空,说明该消息消费组不是第一次注册。如果服务端开启允许消息消费者上传FilterClass,比较两个的 classCRC,如果不相同,说明FilterClass 的源码发生了变化,设置 registerNew 为 true 。
  Step7:如果是第一次注册,则创建 FilterClasslnfo,如果 FilterServer 允许消息消费者上传过滤类源码,则使用 JDK 提供的方法将源代码编译并加装,然后创建其实例,并强制类型转换为 MessageFilter,也就是自定义的消息过滤类必须实现 MessageFilter 接口 。
  上述整个过程就完成了消息消费端向 FilterServer 上传过滤类的过程,但如果FilterServer 不允许消息消费者上传 FilterC!ass ,则 filterServerTable 中存在的过滤类信息只包含className、classCRC、消息过滤类 MessageFilter 属性都为空,也就是说会忽略消息消费者上传的过滤类源代码,那过滤类的源码从哪获取呢?FilterServer 会开启一个定时任务,每隔1分钟从远程服务器下载过滤类源码,再将其编译与实例化。

消息拉取

  RocketMQ 消息的过滤发生在消息消费的时候,PullMessageService 线程默认从Broker上拉取消息,执行相关的过滤逻辑。
  Step1:在FilterServer过滤模式下,在消息拉取时,如果发现消息过滤模式为classFilter,将拉取消息服务器地址由原来的Broker地址转换成该Broker服务器所对应的FilterServer。
  Step2:获取该消息主题的路由信息,从路由信息中获取 Broker 对应的FilterServer列表,如果不为空则随机从FilterServer列表中选择一个FilterServer,发送拉取消息请求至相应的FilterServer上,FilterServer将拉取请求转发给Broker,然后对返回的消息执行消息过滤逻辑,在过滤服务器将消息过滤后再返回给消息消费者

类过滤模式相比TAG模式过滤的优势

1 )基于TAG模式消息过滤,由于在消息服务端进行消息过滤是匹配消息TAG的hashcode,导致服务端过滤并不十分准确,从服务端返回的消息最终并不一定是消息消费者订阅的消息,造成网络带宽的浪费,而基于类模式的消息过滤所有的过滤操作全部在FilterServer端进行。
2 )由于FilterServer 与 Broker 运行在同一台 机器上,消息的传输是通过本地回环通信,不会浪费Broker端的网络资源 。

 

相关文章: