【问题标题】:Create message liseners dynamically for the topics为主题动态创建消息侦听器
【发布时间】:2020-08-31 03:51:08
【问题描述】:

我正在分析一个关于创建可以部署在多个微服务中的通用消费者库的问题(它们都是基于 Spring 的)。要求有大约 15-20 个主题来监听。如果我们使用基于注释的 kafka 监听器,我们需要为每个微服务添加更多代码。有什么方法可以让我们根据某个 xml 文件动态创建消费者,每个消费者都可以注入这些数据

  1. 主题
  2. groupid
  3. 分区
  4. 过滤器(如果有)

有了注解,设计很死板。我唯一能想到的办法是,我们可以在解析xml配置后创建messagelisteners,每个topic都有自己的concurrentmessagelistenercontainer。

有没有其他更好的方法可以使用 spring ?

P.S:我对 spring 和 kafka 有点陌生。如果在解释要求时有任何混淆,请告诉我

谢谢, 拉贾塞卡

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    也许您可以使用主题模式。看看consumer properties。例如。听者

    @KafkaListener(topicPattern = "topic1|topic2")
    

    将收听topic1topic2

    如果您需要动态创建侦听器,必须格外小心,因为您必须关闭它。

    我会使用与 spring 的KafkaListenerAnnotationBeanPostProcessor 类似的方法。这个后置处理器负责处理@KafkaListeners。

    这是一个关于它如何工作的建议:

    public class DynamicEndpointRegistrar {
    
        private BeanFactory beanFactory;
        private KafkaListenerContainerFactory<?> containerFactory;
        private KafkaListenerEndpointRegistry endpointRegistry;
        private MessageHandlerMethodFactory messageHandlerMethodFactory;
    
        public DynamicEndpointRegistrar(BeanFactory beanFactory,
                KafkaListenerContainerFactory<?> containerFactory,
                KafkaListenerEndpointRegistry endpointRegistry, MessageHandlerMethodFactory messageHandlerMethodFactory) {
            this.beanFactory = beanFactory;
            this.containerFactory = containerFactory;
            this.endpointRegistry = endpointRegistry;
            this.messageHandlerMethodFactory = messageHandlerMethodFactory;
        }
    
        public void registerMethodEndpoint(String endpointId, Object bean, Method method, Properties consumerProperties,
                String... topics) throws Exception {
            KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();
            registrar.setBeanFactory(beanFactory);
            registrar.setContainerFactory(containerFactory);
            registrar.setEndpointRegistry(endpointRegistry);
            registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
    
            MethodKafkaListenerEndpoint<Integer, String> endpoint = new MethodKafkaListenerEndpoint<>();
            endpoint.setBeanFactory(beanFactory);
            endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
    
            endpoint.setId(endpointId);
            endpoint.setGroupId(consumerProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
            endpoint.setBean(bean);
            endpoint.setMethod(method);
    
            endpoint.setConsumerProperties(consumerProperties);
            endpoint.setTopics(topics);
    
            registrar.registerEndpoint(endpoint);
            registrar.afterPropertiesSet();
        }
    
    }
    

    然后您应该能够动态注册一个监听器。例如

    DynamicEndpointRegistrar dynamicEndpointRegistrar = ...;
    MyConsumer myConsumer = ...; // create an instance of your consumer
    Properties properties = ...; // consumer properties
    
    // the method that should be invoked
    // (the method that's normally annotated with KafkaListener)
    Method method = MyConsumer.class.getDeclaredMethod("consume", String.class);
    
    dynamicEndpointRegistrar.registerMethodEndpoint("endpointId", myConsumer, method, properties, "topic");
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-08-28
      • 1970-01-01
      • 1970-01-01
      • 2010-12-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多