【问题标题】:Spring boot Rest api with Spring KafkaSpring Boot Rest api 与 Spring Kafka
【发布时间】:2020-11-13 10:28:29
【问题描述】:

我设计了一个 Spring boot REST API ADD 和 GET 方法

    @RestController("ProductV1Controller")
    public class ProductController 
     {

         private final IProductProducer _productProducer;
         public ProductController(IProductProducer productProducer) {
        _productProducer = productProducer;}

         @PostMapping()
            void AddProduct(@Valid @RequestBody ProductViewModel product) {
                _productProducer.AddProduct(product);
            }
        
        @GetMapping()
            List<ProductViewModel> Products() {
                var test = _productProducer.GetProducts();
                return _productProducer.GetProducts();
            }
}

服务层

@Service

    public class ProductProducer implements IProductProducer{
        private final KafkaTemplate<String, Object> _template;
    
        public ProductProducer(KafkaTemplate<String, Object> _template) {
            this._template = _template;
        }
    
        @Override
        public List<ProductViewModel> GetProducts() {
            this._template.send(ProductTopicConstants.GET_PRODUCTS,null);
            return List.of(new ProductViewModel("","",0,"")); --> Need to return the value from the kafka
        }
    
        @Override
        public void AddProduct(ProductViewModel product) {
            this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
        }
       
    }

卡夫卡监听器

 @KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
    public List<Product> GetProducts() {
        return _productRepository.findAll();
    }

在服务层GetProducts()我需要返回来自_productRepository.findAll();的项目列表

使用 Spring kafka 执行 REST API 的最佳方法是什么。

【问题讨论】:

    标签: java spring spring-boot apache-kafka spring-kafka


    【解决方案1】:

    您需要使用ReplyingKafkaTemplate 将结果返回给其余控制器。

    ReplyingKafkaTemplate

    2.1.3 版引入了 KafkaTemplate 的子类来提供请求/回复语义。该类名为 ReplyingKafkaTemplate 并且有一个方法(除了超类中的方法)。

    结果是一个用结果异步填充的 ListenableFuture(或异常,超时)。结果还有一个 sendFuture 属性,它是调用 KafkaTemplate.send() 的结果。您可以使用这个未来来确定发送操作的结果。

    文档有一个例子。

    编辑

    @SpringBootApplication
    @RestController
    public class So63058608Application {
    
        private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So63058608Application.class, args);
        }
    
        @Autowired
        private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;
    
        @GetMapping(path = "/get")
        public List<String> getThem() throws Exception {
            RequestReplyFuture<String, String, List<String>> future =
                    this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
            LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
            return future.get(10, TimeUnit.SECONDS).value();
        }
    
        @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
        @SendTo
        public List<String> returnList(@Payload(required = false) String payload) {
            return new ArrayList<>(List.of("foo", "bar", "baz"));
        }
    
        @Bean
        public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
                ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
    
            containerFactory.setReplyTemplate(kafkaTemplate(pf));
            ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
            ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
            return replyer;
        }
    
        @Bean
        public ConcurrentMessageListenerContainer<String, List<String>> replyContainer(
                ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
    
            ConcurrentMessageListenerContainer<String, List<String>> container =
                    containerFactory.createContainer("so63058608-2");
            container.getContainerProperties().setGroupId("so63058608-2");
            container.setBatchErrorHandler(new BatchLoggingErrorHandler());
            return container;
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
            return new KafkaTemplate<>(pf);
        }
    
        @Bean
        public NewTopic topic1() {
            return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
        }
    
        @Bean
        public NewTopic topic3() {
            return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
        }
    
    }
    
    spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.properties.spring.json.trusted.packages=*
    
    spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    
    $ curl localhost:8080/get
    ["foo","bar","baz"]
    

    EDIT2

    并返回一些对象的列表...

    @SpringBootApplication
    @RestController
    public class So63058608Application {
    
        private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So63058608Application.class, args);
        }
    
        @Autowired
        private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;
    
        @GetMapping(path = "/get")
        public List<Foo> getThem() throws Exception {
            RequestReplyFuture<String, String, List<Foo>> future =
                    this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
            LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
            List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
            LOG.info(result.toString());
            return result;
        }
    
        @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
        @SendTo
        public List<Foo> returnList(@Payload(required = false) String payload) {
            return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
        }
    
        @Bean
        public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
                ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
    
            containerFactory.setReplyTemplate(kafkaTemplate(pf));
            ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
            ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
            return replyer;
        }
    
        @Bean
        public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer(
                ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
    
            ConcurrentMessageListenerContainer<String, List<Foo>> container =
                    containerFactory.createContainer("so63058608-2");
            container.getContainerProperties().setGroupId("so63058608-2");
            container.setBatchErrorHandler(new BatchLoggingErrorHandler());
            return container;
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
            return new KafkaTemplate<>(pf);
        }
    
        @Bean
        public NewTopic topic1() {
            return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
        }
    
        @Bean
        public NewTopic topic3() {
            return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
        }
    
        public static JavaType returnType(byte[] data, Headers headers) {
            return TypeFactory.defaultInstance()
                    .constructCollectionLikeType(List.class, Foo.class);
        }
    
    }
    
    class Foo {
    
        private String bar;
    
        public Foo() {
        }
    
        public Foo(String bar) {
            this.bar = bar;
        }
    
        public String getBar() {
            return this.bar;
        }
    
        public void setBar(String bar) {
            this.bar = bar;
        }
    
        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }
    
    }
    
    spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
    
    [Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]
    

    【讨论】:

    • 是否有任何带有 Rest API 的示例应用程序?
    • 不,但我在答案中添加了一个示例。
    • 如何传递字符串或对象。当我尝试 RequestReplyFuture future = this._replyTemplate.sendAndReceive(new ProducerRecord(ProductTopicConstants.GET_PRODUCT, 0, null,"uu7777uuuuuuu"));我收到一个异常无法从 JSON 转换;
    • 不要将代码放入 cmets;它渲染得不好;改为编辑问题并包含完整的堆栈跟踪。并评论你已经这样做了,所以我会收到通知。
    • 我添加了一个新问题stackoverflow.com/questions/63099836/…
    猜你喜欢
    • 2021-08-28
    • 2019-03-11
    • 2020-10-24
    • 2020-03-15
    • 2023-03-25
    • 2022-11-03
    • 2020-11-02
    • 2018-04-25
    • 2017-04-23
    相关资源
    最近更新 更多