【问题标题】:spring boot kafka generic JSON templateSenderspring boot kafka 通用 JSON 模板发送方
【发布时间】:2020-12-22 19:16:30
【问题描述】:
package com.bankia.apimanager.config;

import com.bankia.apimanager.model.RequestDTO;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, RequestDTO> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, RequestDTO> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
package com.bankia.apimanager.controller;

import com.bankia.apimanager.model.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/infrastructure")
public class InfraStructureRequestController {

    private final static Logger LOG = LoggerFactory.getLogger( InfraStructureRequestController.class );

    private static final String TOPIC = "test";

    @Autowired
    private KafkaTemplate<String, RequestDTO> sender;

    @RequestMapping(value = "/test", method = RequestMethod.GET)
    public String postMessage(){

        ListenableFuture<SendResult<String, RequestDTO>> future = sender.send(TOPIC, new RequestDTO("Hola","Paco"));
        future.addCallback(new ListenableFutureCallback<SendResult<String, RequestDTO>>() {
            @Override
            public void onSuccess(SendResult<String, RequestDTO> result) {
                LOG.info("Sent message with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Unable to send message due to : " + ex.getMessage());
            }
        });
        return "OK";
    }
}

但是如果我现在想发送一个新的 DTO 对象呢?我是否必须声明一个新的KafkaTemplate&lt;String,NEWOBJECT&gt; 并自动装配在配置中为每个对象声明的每个 kafka 模板?还有另一种方法可以只声明一个 kafkaTemplate ,我可以在其中发送任何类型的对象并自动在 JSON 中序列化?

【问题讨论】:

    标签: json spring-boot serialization apache-kafka configuration


    【解决方案1】:

    我认为,您可以指定一个通用的KafkaTemplate&lt;String, Object&gt; 并将生产者值序列化器设置为JsonSerializer,如下所示:

    @Configuration
    public class KafkaConfiguration {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            return props;
        }
    
        @Bean
        public ProducerFactory<String, Object> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, Object> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
    }
    

    【讨论】:

    • 您能否澄清一下到底是什么不起作用?谢谢!
    【解决方案2】:

    引用您的代码:

    • Value Serializer 被正确定义为 JsonSerializer,它将任何类型的对象转换为 JSON。
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }
    
    • 在 KafkaConfig & Controller 的每个地方将 更改为

    请记住,泛型会一直保留到编译时(类型擦除) 仅限。

    【讨论】:

      【解决方案3】:

      有两种情况:

      场景 #1

      如果您想使用KafkaTemplate 将任何类型(如您的问题中提到的)发送到kafka,则无需声明自己的KafkaTemplate bean,因为Spring boot 在KafkaAutoConfiguration 中为您完成了此操作。

      package org.springframework.boot.autoconfigure.kafka;
      
      ...
      
      @Configuration(proxyBeanMethods = false)
      @ConditionalOnClass(KafkaTemplate.class)
      @EnableConfigurationProperties(KafkaProperties.class)
      @Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
      public class KafkaAutoConfiguration {
      
          private final KafkaProperties properties;
      
          public KafkaAutoConfiguration(KafkaProperties properties) {
              this.properties = properties;
          }
      
          @Bean
          @ConditionalOnMissingBean(KafkaTemplate.class)
          public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
                  ProducerListener<Object, Object> kafkaProducerListener,
                  ObjectProvider<RecordMessageConverter> messageConverter) {
              KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
              messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
              kafkaTemplate.setProducerListener(kafkaProducerListener);
              kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
              return kafkaTemplate;
          }
      }
      
      
      

      **Some Note**:

      1. 此配置类已用 @ConditionalOnClass(KafkaTemplate.class) 注释,这意味着:(来自 spring 文档--->)@Conditional 仅在指定类位于类路径上时匹配。

      2. kafkaTemplate bean 方法被注释为 @ConditionalOnMissingBean(KafkaTemplate.class) 表示:(来自 spring 文档 ---->)@Conditional 仅在 BeanFactory 中已包含满足指定要求的 bean 时才匹配。

      3. 重要!在纯 Java 世界中,KafkaTemplate&lt;?, ?&gt; 不是例如 KafkaTemplate&lt;String, RequestDTO&gt; 的子类型,因此您不能这样做:

      KafkaTemplate<?, ?> kf1 = ...;
      KafkaTemplate<String, RequestDTO> kf2 = kf1; // Compile time error
      

      因为 Java 参数化类型是不变的,如 Effective Java 第三版第 31 项中所述。但是 Spring World 是可以的,并且会被注入到您自己的服务中。你只需要在你的 kafkaTemplate 属性上指定你自己的泛型类型。 例如:

      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Service;
      
      @Service
      public class KafkaService {
          @Autowired
          private KafkaTemplate<Integer, String> kafkaTemplate1;
          @Autowired
          private KafkaTemplate<Integer, RequestDTO> KafkaTemplate2;
      }
      
      

      场景 #2

      如果您需要限制 kafka 记录的值类型,那么您需要指定自己的 kafka bean,如下所示:

      
      @Configuration(proxyBeanMethods = false)
      @ConditionalOnClass(KafkaTemplate.class)
      @EnableConfigurationProperties(CorridorTracingConfiguration.class)
      public class CorridorKafkaAutoConfiguration {
          
          @Bean
          @ConditionalOnMissingBean(KafkaTemplate.class)
          public KafkaTemplate<?, AbstractMessage> kafkaTemplate(ProducerFactory<Object, AbstractMessage> kafkaProducerFactory,
                                                                    ProducerListener<Object, AbstractMessage> kafkaProducerListener,
                                                                    ObjectProvider<RecordMessageConverter> messageConverter) {
              KafkaTemplate<Object, AbstractMessage> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
              messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
              kafkaTemplate.setProducerListener(kafkaProducerListener);
              kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
              return kafkaTemplate;
          }
      
      

      现在这只能注入到 KafkaTemplate&lt;String, AbstractMessage&gt; kafkaTemplate,密钥类型可以是其他任何类型,而不是 String。但是您可以通过它将AbstractMessage 的任何子类型发送到kafka。

      示例用法:

      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Service;
      
      @Service
      public class KafkaService {
          @Autowired
          private KafkaTemplate<String, AbstractMessage> kafkaTemplate;
      
          public void makeTrx(TrxRequest trxRequest) {
              kafkaTemplate.send("fraud-request", trxRequest.fromAccountNumber(), new FraudRequest(trxRequest));
          }
      
      
      }
      
      @Accessors(chain = true)
      @Getter
      @Setter
      @EqualsAndHashCode(callSuper = true)
      @ToString(callSuper = true)
      public class FraudRequest extends AbstractMessage {
          private float amount;
          private String fromAccountNumber;
          private String toAccountNumber;
      
      ...
      
      }
      
      

      要限制kafka消息的密钥,请遵循相同(以上)的方式

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-10-31
        • 2019-05-30
        • 2022-01-02
        • 2021-07-23
        • 1970-01-01
        • 1970-01-01
        • 2013-01-21
        相关资源
        最近更新 更多