【问题标题】:Can we use multiple kafka template in spring boot?我们可以在 Spring Boot 中使用多个 kafka 模板吗?
【发布时间】:2020-01-14 05:35:22
【问题描述】:

在我的 spring boot kafka 发布者应用程序中,我想支持以 String(json) 或字节格式发布消息,因为我想同时支持 json 和 avro。但是 Spring Boot 中的 Kafka 模板让我们只定义其中一个模板。有没有办法同时使用这两个模板或任何其他方式来提供对 json 和 avro 的支持?

KafkaTemplate<String, String> 仅适用于字符串,但我也想发布 avro,它应该类似于 KafkaTemplate<String, byte[]>

【问题讨论】:

  • 嗯..为它创建一个新bean?即使两个 bean 的类型相同,您也可以使用 Qualifier 或 Primary(注解)。这根本不应该是一个问题。

标签: apache-kafka spring-kafka kafka-producer-api


【解决方案1】:

您可以尝试使用不同的配置创建 KafkaTemplate:

@Bean
public ProducerFactory<String, String> producerFactoryString() {
    Map<String, Object> configProps = new HashMap<>();
    //additional config parameters .... 
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ProducerFactory<String, byte[]> producerFactoryByte() {
    Map<String, Object> configProps = new HashMap<>();
    //additional config parameters ....
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
    return new KafkaTemplate<>(producerFactoryString());
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplateByte() {
    return new KafkaTemplate<>(producerFactoryByte());
}

【讨论】:

    【解决方案2】:

    您可以创建 Kafka 配置。我必须将数据发送到 2 个不同的服务器。

    @Configuration
    public class KafkaConfig {
    
        private final MosaicKafkaConfig mosaicKafkaConfig;
        private final StreamKafkaConfig streamKafkaConfig;
    
        public KafkaConfig(MosaicKafkaConfig mosaicKafkaConfig, StreamKafkaConfig streamKafkaConfig) {
            this.mosaicKafkaConfig = mosaicKafkaConfig;
            this.streamKafkaConfig = streamKafkaConfig;
        }
    
        @Bean
        public ProducerFactory<?, ?> kafkaProducerFactoryForMosaic() {
            KafkaProperties kafkaProperties = new KafkaProperties();
            KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
            ResourceLoader resourceLoader = new DefaultResourceLoader();
            Resource resource = resourceLoader.getResource(mosaicKafkaConfig.getSslTrustStoreLocation());
            ssl.setTrustStoreLocation(resource);
            ssl.setTrustStorePassword(mosaicKafkaConfig.getSslTrustStorePassword());
            ssl.setTrustStoreType(mosaicKafkaConfig.getSslTrustStoreType());
            Map<String, String> props = kafkaProperties.getProperties();
            props.put("sasl.jaas.config", mosaicKafkaConfig.getSaslConfig());
            props.put("sasl.mechanism", mosaicKafkaConfig.getSaslMechanism());
            props.put("security.protocol", mosaicKafkaConfig.getSaslSecProtocol());
    
            kafkaProperties.getProducer().setValueSerializer(mosaicKafkaConfig.getValaueSerializer());
            kafkaProperties.getProducer().setClientId(mosaicKafkaConfig.getClientID());
            kafkaProperties.getProducer().setBootstrapServers(mosaicKafkaConfig.getBootstrapServers());
    
            Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
            return new DefaultKafkaProducerFactory<>(configProps);
    
        }
    
        @Bean
        public KafkaTemplate<?, ?> kafkaTemplateForMosaic(ProducerFactory<Object, Object> kafkaProducerFactoryForMosaic) {
            KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForMosaic);
            return kafkaTemplate;
        }
    
    
        @Bean
        public ProducerFactory<?, ?> kafkaProducerFactoryForStream() {
            KafkaProperties kafkaProperties = new KafkaProperties();
            KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
            ResourceLoader resourceLoader = new DefaultResourceLoader();
            Resource resource = resourceLoader.getResource(streamKafkaConfig.getSslTrustStoreLocation());
            ssl.setTrustStoreLocation(resource);
            ssl.setTrustStorePassword(streamKafkaConfig.getSslTrustStorePassword());
            ssl.setTrustStoreType(streamKafkaConfig.getSslTrustStoreType());
            Map<String, String> props = kafkaProperties.getProperties();
            props.put("sasl.jaas.config", streamKafkaConfig.getSaslConfig());
            props.put("sasl.mechanism", streamKafkaConfig.getSaslMechanism());
            props.put("security.protocol", streamKafkaConfig.getSaslSecProtocol());
    
            kafkaProperties.getProducer().setValueSerializer(streamKafkaConfig.getValaueSerializer());
            kafkaProperties.getProducer().setClientId(streamKafkaConfig.getClientID());
            kafkaProperties.getProducer().setBootstrapServers(streamKafkaConfig.getBootstrapServers());
    
            Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
            return new DefaultKafkaProducerFactory<>(configProps);
    
        }
    
        @Bean
        public KafkaTemplate<?, ?> kafkaTemplateForStream(ProducerFactory<Object, Object> kafkaProducerFactoryForStream) {
            KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForStream);
            return kafkaTemplate;
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2021-05-14
      • 1970-01-01
      • 2015-12-31
      • 2016-06-17
      • 1970-01-01
      • 2020-06-04
      • 2020-07-17
      • 2021-11-02
      • 2021-08-08
      相关资源
      最近更新 更多