【问题标题】:Creating KafkaListener without Annotation & without Spring Boot在没有注释和没有 Spring Boot 的情况下创建 KafkaListener
【发布时间】:2019-07-19 22:33:49
【问题描述】:

我正在尝试在不使用 @KafkaListener 注释的情况下为主题创建 Kafka 消费者。我想这样做是因为我试图在不使用 spring boot 的情况下基于 application.properties 动态创建侦听器。

我认为最好的方法是手动创建一个 KafkaListenerContainerFactory 有人可以提供一个如何在它自己的类中执行此操作的示例。

【问题讨论】:

标签: spring java-8 spring-kafka


【解决方案1】:
  • 带弹簧
@Bean
public KafkaMessageListenerContainer<String, String> messageListenerContainer(String topic) {

    ContainerProperties containerProperties = new ContainerProperties(topic);
    containerProperties.setMessageListener(new MyMessageListener());

    ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
    KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
    listenerContainer.setAutoStartup(false);
    // bean name is the prefix of kafka consumer thread name
    listenerContainer.setBeanName("kafka-message-listener");
    return listenerContainer;
}

private Map<String, Object> consumerProperties(){
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    return props;
}

static class MyMessageListener implements MessageListener<String, String> 
    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        // do something
    }
}
     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("enable.auto.commit", "true");
     props.setProperty("auto.commit.interval.ms", "1000");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

【讨论】:

    【解决方案2】:

    我也有同样的需要。我不想使用低级消费者并自己调用民意调查。我想使用与@KafkaListener 相同的逻辑,只是动态配置它,尤其是根据配置创建多个 Kafka 侦听器。

    以下解决方案是我正在寻找的: http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/

    【讨论】:

      【解决方案3】:

      这将在后台启动一个消费者线程。 它还会在每 10 秒后向test-topic 发送一条消息。

        import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.common.serialization.StringDeserializer;
        import org.apache.kafka.common.serialization.StringSerializer;
        import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
        import org.springframework.kafka.core.DefaultKafkaProducerFactory;
        import org.springframework.kafka.core.KafkaTemplate;
        import org.springframework.kafka.core.ProducerFactory;
        import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
        import org.springframework.kafka.listener.config.ContainerProperties;
      
        import java.util.Date;
        import java.util.HashMap;
        import java.util.Map;
        import java.util.concurrent.Executors;
        import java.util.concurrent.ScheduledExecutorService;
        import java.util.concurrent.TimeUnit;
      
        public class PeriodicProducerConsumer implements Runnable {
      
          KafkaTemplate<String, Object> kafkaTemplate;
          ScheduledExecutorService service;
      
          PeriodicProducerConsumer() {
            // Producer Declaration
            Map<String, Object> configs = new HashMap<>();
            configs.put("bootstrap.servers", "localhost:9092");
            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      
            ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(configs);
            this.kafkaTemplate = new KafkaTemplate<>(producerFactory);
      
      
            // Consumer Declaration
            Map<String, Object> consumerConfig = new HashMap<>();
            consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
            consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "airbus-service-ka-consumer-group");
      
            DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
                new DefaultKafkaConsumerFactory<>(
                    consumerConfig,
                    new StringDeserializer(),
                    new StringDeserializer());
      
            String topic = "test-topic";
            ContainerProperties containerProperties = new ContainerProperties(topic);
            containerProperties.setMessageListener(new AirbusServiceKaMessageListener());
      
            ConcurrentMessageListenerContainer<String, String> container =
                new ConcurrentMessageListenerContainer<>(
                    kafkaConsumerFactory,
                    containerProperties);
            container.start();
          }
      
          public void start() {
            service = Executors.newSingleThreadScheduledExecutor();
            service.scheduleWithFixedDelay(this, 5, 10, TimeUnit.SECONDS);
          }
      
          public void stop() {
            service.shutdown();
          }
      
          @Override
          public void run() {
            String data = String.format("New Airbus Hello at %s", new Date());
            kafkaTemplate.send("test-topic", data);
          }
        }
      

      这里是Consumed消息处理函数:

      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.springframework.kafka.listener.MessageListener;
      
      public class AirbusServiceKaMessageListener implements MessageListener<String, Object> {
          private static final Logger LOG = LoggerFactory.getLogger(AirbusServiceKaMessageListener.class);
      
          @Override
          public void onMessage(ConsumerRecord<String, Object> data) {
              LOG.info("########################## New Consuming Message From Message Listener ##########################");
              LOG.info("Message # {}", data.value());
              LOG.info("#################################################################################################");
          }
      }
      

      Bean.xml

          <bean name="purekafkaProduceConsume" class="com.myntra.airbus.purekafka.PeriodicProducerConsumer" scope="singleton"
                init-method="start" destroy-method="stop">
          </bean>
      

      【讨论】:

        猜你喜欢
        • 2015-08-03
        • 1970-01-01
        • 2022-11-24
        • 2021-04-28
        • 2016-11-09
        • 2020-04-24
        • 2016-12-04
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多