标题比较长,实在想不出什么好的描述。大概要解决的问题就是,同一个服务同时监听多个topic,且在每个topic中的group都不相同,具体看问题描述吧。

一、问题背景

 前几天部署了一套系统,每个服务都搭建了多个节点,而且是没有主从关系的节点。每个服务中有很多东西是放到缓存中的,配置多节点之后,相同服务的不同节点出现了缓存不一致的问题。

二、问题描述

 刚开始想出一种解决方案,监听同一个topic1,每个节点分到一个group中,这样每次生产者生产消息后,kafka会将消息分发到所有group中,消息中带一个消息类型字段(mq_type)。
各个节点由于处于不同group中都会消费此消息,然后根据mq_type判断是否该处理此消息。
 然而,pass。原因:由于此系统(系统B)中的服务1还与系统A有消费与生产消息的关系,都放到一个topic下数据不规范。而且如果多个服务1同时消费消息,会进行读表改表操作,还得做处理。

 emmm,又想出了一种解决方案,系统B中每个节点还是分到不同的group中,当某个服务1消费到系统A发送的消息,需要刷新缓存时,该节点对所有节点通过系统B内部的消息队列topic2进行广播,各个服务接收到消费消息后根据消息类型进行缓存的更新。
具体系统图如下:
kafka一个服务配置到不同topic的不同group
图片备用链接
ps:以上区分两个topic是为了规范来自不同的渠道的数据走不同的topic,如果没有这种要求完全没有必要做如下这种操作,可以直接通过group和消息内容去做区分
 如上图,系统A通过topic1向系统B中的服务1发送消息,系统B中服务1和服务2以及他们的其他节点在系统B中通过topic2发送消息。
 可以看出,系统B中的服务1扮演了三个角色:系统A发送消息的消费者,系统B内部消息的生产者和消费者。可以得出如下问题:

对于服务1,需要将其配置为监听两个topic,分别监听topic1和topic2
系统A向系统B发送消息时,服务1以及他的其他节点处于topic1的同一个group下,即只有一个服务1节点会去消费系统A发来的消息
系统B内部之间发送消息时,每个服务和节点都处于topic2的不同group下

 说到这里,其实就清楚很多了。其实就是想让服务1-1和他的其他节点在topic1中都处于group-A2B中,服务1-1在topic2中处于group-service1-1中,服务1-2在topic2中处于group-service1-2中。

三、需求实现

3.1 代码基础

 kafka的基础代码请参照我的以下两篇博客,本次修改都是基于这些代码的基础上改造的
Kafka及Spring&Kafka整合
kafka动态配置topic

3.2 生产者

 kafka生产者发送消息时,会向该topic下的所有group发送消息,而每个group只会有一个消费者进行消费。所以生产者不用进行更改。

3.3 消费者

3.3.1 消费者的配置

 以下消费者以服务1-1为例,其他节点服务同理。
 由于同一个服务要扮演两个消费者,所以我们需要不同的配置文件用来生成不同的消费者

//首先是获取公共配置方法
    public Map<String, Object> getCommonPropertis(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put("auto.offset.reset", "latest");// 一般配置earliest 或者latest 值
        return props;
    }

//然后不同的用来生成不同消费者的工厂
    //topic1的消费者
    public ConsumerFactory<String, String> consumerFactoryA2B() {
        Map<String, Object> properties = getCommonPropertis();
        //所在group
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-A2B");
        return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
    
    
    //系统B内topic2的每个服务的group我这里用服务名+ip+端口命名
    String GROUP_NAME = "service1-1-"+serviceInfoUtil.getIpAddress()+"-"+serviceInfoUtil.getLocalPort();
    
    //topic2的消费者
    public ConsumerFactory<String, String> consumerFactoryB2B(){
            Map<String, Object> properties = getCommonPropertis();
            //所在group
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_NAME);
            return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
    
//再通过不同的配置工厂生成实例bean
    //topic1的消费者bean
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryA2B() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryA2B());//通过不同工厂获取实例
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }  
    
    //topic2的消费者bean
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryB2B() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryB2B());//通过不同工厂获取实例
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }  
    

3.3.2 消费者的使用

 以上消费者的配置就算完成了,接下来就可以直接使用了。

     /**
     * 监听B2B所有消息
     * @param record
     */
    @KafkaListener(topics = "#{'${kafka.B2B.listener_topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactoryB2B")
    public void B2Bconsume(ConsumerRecord<?, ?> record){
        recordDeal(record);
    }

    /**
     * 监听A2B的所有消息
     * @param record
     */
    @KafkaListener(topics = "#{'${kafka.A2B.listener_topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactoryA2B")
    public void A2Bconsume(ConsumerRecord<?, ?> record) {
        recordDeal(record);
    }
    
//containerFactory = "kafkaListenerContainerFactoryA2B"  主要就是这个containerFactory参数,用它控制是哪个实例

3.3.3 获取服务启动的ip和端口类

@Configuration
public class ServiceInfoUtil {
    public static String getIpAddress() throws UnknownHostException {
        InetAddress address = InetAddress.getLocalHost();
        return address.getHostAddress();
    }
    public static String getLocalPort() throws MalformedObjectNameException {
        MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
        Set<ObjectName> objectNames = beanServer.queryNames(new ObjectName("*:type=Connector,*"),
                Query.match(Query.attr("protocol"), Query.value("HTTP/1.1")));
        String port = objectNames.iterator().next().getKeyProperty("port");
        return port;
    }
}    

3.3.4 最后

 这样修改后启动时,通过配置文件中的kafka.A2B.listener_topics去判断这个消费者该监听哪个topic,通过containerFactory = "kafkaListenerContainerFactoryA2B"判断这个消费者在这个topic中属于哪个group。
然后发送消息测试,成了。

四、感谢大佬

这几个大佬的对于kafka的group的讲解比较好:
KAFKA 多个消费者同一个GROUPID,只有一个能收到消息的原因
Kafka消费组(consumer group)
springboot 集成kafka 实现多个customer不同group

相关文章: