【问题标题】:Is there a code sample for multiple producers in spring kafka?spring kafka中是否有多个生产者的代码示例?
【发布时间】:2018-08-04 04:13:47
【问题描述】:

我有一个可能需要多个生产者的应用程序。我看到的所有代码示例似乎都支持单个生产者,在应用启动期间从应用读取配置。如果有多个生产者并且我们想传入不同的生产者配置,那么 Spring 是否提供开箱即用的支持?还是在这种情况下我应该不使用弹簧?

【问题讨论】:

    标签: spring apache-kafka spring-kafka


    【解决方案1】:

    您可以通过同一个 ProducerFactory 创建多个 Producer 实例 (KafkaTemplate)。

    如果您需要不同的 Kafka 配置,则需要不同的 ProducerFactory 实例。

    【讨论】:

      【解决方案2】:

      你必须创建两个不同的ProducerFactory 下面是示例

          import org.apache.kafka.clients.producer.ProducerConfig;
          import org.apache.kafka.common.serialization.StringSerializer;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.kafka.core.DefaultKafkaProducerFactory;
          import org.springframework.kafka.core.KafkaTemplate;
          import org.springframework.kafka.core.ProducerFactory;
      
          import java.util.HashMap;
      
          @Configuration
          public class KafkaProducerConfig {
      
      
              @Bean
              public ProducerFactory<String, String> confluentProducerFactory() {
      
                  HashMap<String, Object> configProps = new HashMap<String, Object>();
                  configProps.put(
                          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                          "localhost:9092");
                  configProps.put(
                          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                          StringSerializer.class);
                  configProps.put(
                          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                          StringSerializer.class);
                  return new DefaultKafkaProducerFactory<>(configProps);
              }
      
      
              @Bean
              public ProducerFactory<String, String> cloudraProducerFactory() {
      
                  HashMap<String, Object> configProps = new HashMap<String, Object>();
                  configProps.put(
                          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                          "localhost:9094");
                  configProps.put(
                          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                          StringSerializer.class);
                  configProps.put(
                          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                          StringSerializer.class);
                  return new DefaultKafkaProducerFactory<>(configProps);
              }
      
              @Bean(name = "confluent")
              public KafkaTemplate<String, String> confluentKafkaTemplate() {
                  return new KafkaTemplate<>(confluentProducerFactory());
              }
      
              @Bean(name = "cloudera")
              public KafkaTemplate<String, String> clouderaKafkaTemplate() {
                  return new KafkaTemplate<>(cloudraProducerFactory());
              }
      
          }
      
      
      
      
      public class ProducerExample {
      
          @Autowired
          @Qualifier("cloudera")
          private KafkaTemplate clouderaKafkaTemplate;
      
      
          @Autowired
          @Qualifier("confluent")
          private KafkaTemplate confluentKafkaTemplate;
      
          public void send() {
              confluentKafkaTemplate.send("TestConfluent", "hey there..confluent");
              clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera");
          }
      
      }
      

      【讨论】:

      • 和多个消费者?
      • 应用属性中能否定义不同的生产者工厂(即生产者kafka config)?
      • @user518066 是的,请参阅下面的答案。
      【解决方案3】:

      从 2.5 版开始,您可以使用 RoutingKafkaTemplate 在运行时根据目标主题名称选择生产者。 https://docs.spring.io/spring-kafka/reference/html/#routing-template

      【讨论】:

        【解决方案4】:

        如果您仍想像往常一样保留application.yaml 中的配置,并尽可能减少Java 配置,您可以扩展KafkaProperties.Producer

        
        @Configuration
        @ConfigurationProperties(prefix = "spring.kafka.producer-1")
        @RequiredArgsConstructor
        class FirstProducer extends KafkaProperties.Producer {
            private final KafkaProperties common;
        
            @Qualifier("producer-1")
            @Bean
            public ProducerFactory<?, ?> producerFactory() {
                final var conf = new HashMap<>(
                    this.common.buildProducerProperties()
                );
                conf.putAll(this.buildProperties());
                return new DefaultKafkaProducerFactory<>(conf);
        
            }
        
            @Qualifier("producer-1")
            @Bean
            public KafkaTemplate<?, ?> kafkaTemplate() {
                return new KafkaTemplate<>(this.producerFactory());
        
            }
        }
        
        @Configuration
        @ConfigurationProperties(prefix = "spring.kafka.producer-2")
        @RequiredArgsConstructor
        class SecondProducer extends KafkaProperties.Producer {
            private final KafkaProperties common;
        
            @Qualifier("producer-2")
            @Bean
            public ProducerFactory<?, ?> producerFactory() {
                final var conf = new HashMap<>(
                    this.common.buildProducerProperties()
                );
                conf.putAll(this.buildProperties());
                return new DefaultKafkaProducerFactory<>(conf);
        
            }
        
            @Qualifier("producer-2")
            @Bean
            public KafkaTemplate<?, ?> kafkaTemplate() {
                return new KafkaTemplate<>(this.producerFactory());
        
            }
        }
        
        

        【讨论】:

          猜你喜欢
          • 2020-07-20
          • 1970-01-01
          • 1970-01-01
          • 2019-05-09
          • 2021-09-03
          • 2016-07-19
          • 2018-04-28
          • 2018-11-16
          • 1970-01-01
          相关资源
          最近更新 更多