【问题标题】:Implement Spring Service to send message to different Kafka topics based on configuration实现 Spring Service 以根据配置向不同的 Kafka 主题发送消息
【发布时间】:2021-05-18 07:51:40
【问题描述】:

我想使用 Spring Services 根据配置将数据发送到不同的 Kafka 消息:

ResponseFactory processingPeply = null;

        switch(endpointType)
        {
            case "email":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-email.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionEmailReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
            case "sms":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sms.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionSmsReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
            case "network":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-network.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionNetworkReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
              
            default:
                processingPeply = ResponseFactory.builder().status("error").build();
        } 

我目前得到:

  • 变量“记录”已在范围中定义
  • 变量“sendResult”已在范围内定义
  • 变量“consumerRecord”已在范围内定义

您知道如何以更好的方式重新设计代码以便解决问题吗? 我想在 Spring Service 中使用 DRY 原则以减少代码。

【问题讨论】:

    标签: java spring spring-boot


    【解决方案1】:

    您可以自动连接所有 ReplyingKafkaTemplate 并查找与您的端点类型匹配的那个。

    @Autowired
    private List<ReplyingKafkaTemplate<String, Object, Object>> templates;
    
    ReplyingKafkaTemplate<String, Object, Object> template = null;
    for(ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate :  templates) {
        String defaultTopic = replyingKafkaTemplate.getDefaultTopic();
        if (defaultTopic.contains(endpointType)) {
            template = replyingKafkaTemplate;
            break;
        }
    }
    ProducerRecord<String, Object> record = new ProducerRecord<>(template.getDefaultTopic(), tf);
    RequestReplyFuture<String, Object, Object> replyFuture = template.sendAndReceive(record);
    SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
    ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
    ResponseFactory processingPeply = (ResponseFactory) consumerRecord.value();
    

    您还可以设置配置以创建查找类型的 bean,然后注入 Map&lt;String, ReplyingKafkaTemplate&gt; 以便于查找。由于我不知道您的设置,因此无法为您提供配置设置。

    @Autowired
    private Map<String, ReplyingKafkaTemplate<String, Object, Object>>> templates;
    
    ReplyingKafkaTemplate<String, Object, Object> template = templates.get(endpointType);
    ProducerRecord<String, Object> record = new ProducerRecord<>(template.getDefaultTopic(), tf);
    RequestReplyFuture<String, Object, Object> replyFuture = template.sendAndReceive(record);
    SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
    ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
    ResponseFactory processingPeply = (ResponseFactory) consumerRecord.value();
    

    【讨论】:

    • +1 消除 switch 语句的最佳重构,+1 我刚刚了解到您可以注入列表。
    • 这里是设置:stackoverflow.com/questions/65866763/…你能扩展第二个例子吗?
    • 我没有看到任何简单的映射方法,除了一次将所有这些映射到配置 bean 中以创建映射。类似@Bean Map&lt;String, ReplyingKafkaTemplate&gt; registerEndPointType(ReplyingKafkaTemplate emailReplyTemplate, ReplyingKafkaTemplate smsReplyTem....) { Map&lt;String, ReplyingKafkaTemplate&gt; map = new HashMap&lt;&gt;(); map.put("email", emailReplyTemplate); map.put("sms", smsReplyTem...) return map; }
    • 我不明白。能否请您稍微扩展一下示例?
    • 对不起,哪一部分?您可以创建一个映射类型的配置 bean,其中端点为键,模板为值。
    【解决方案2】:

    我认为您可以使用接口来分离将数据发送到不同端点的逻辑。看看下面的代码:

    发送数据和接收响应的主类。它对电子邮件、短信、网络发件人一无所知。

    package com.example.demo.service;
    
    import com.example.demo.dto.Response;
    import org.springframework.stereotype.Service;
    
    import java.util.List;
    
    @Service
    public class KafkaSender {
    
        private final List<EndpointSender> senders;
    
        public KafkaSender(List<EndpointSender> senders) {
            this.senders = senders;
        }
    
        public Response send(Object data, String endpoint) {
            return senders
                .stream()
                .filter(it -> it.supports(endpoint))
                .findAny()
                .map(it -> it.send(data))
                .orElseGet(() -> new Response("error"));
        }
    }
    

    然后我们创建这样的界面:

    package com.example.demo.service;
    
    import com.example.demo.dto.Response;
    
    public interface EndpointSender {
        Response send(Object obj);
        boolean supports(String endpoint);
    }
    

    以及实现:

    减少样板代码的基类:

    package com.example.demo.service.sender;
    
    import com.example.demo.dto.Response;
    import com.example.demo.service.EndpointSender;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
    import org.springframework.kafka.requestreply.RequestReplyFuture;
    import org.springframework.kafka.support.SendResult;
    
    import java.util.concurrent.TimeUnit;
    
    public abstract class BaseSender implements EndpointSender {
    
        public abstract ProducerRecord<String, Object> getRecord(Object obj);
    
        public abstract ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate();
    
        @Override
        public Response send(Object obj) {
            try {
                RequestReplyFuture<String, Object, Object> replyFuture = kafkaTemplate().sendAndReceive(getRecord(obj));
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
    
                return (Response) consumerRecord.value();
            } catch (Throwable t) {
                throw new RuntimeException(t);
            }
        }
    }
    

    以及发件人的实现: 电子邮件发件人:

    package com.example.demo.service.sender;
    
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class EmailSender extends BaseSender {
    
        private final ReplyingKafkaTemplate<String, Object, Object> processingTransactionEmailReplyKafkaTemplate;
    
        public EmailSender(ReplyingKafkaTemplate<String, Object, Object> processingTransactionEmailReplyKafkaTemplate) {
            this.processingTransactionEmailReplyKafkaTemplate = processingTransactionEmailReplyKafkaTemplate;
        }
    
        @Override
        public boolean supports(String endpoint) {
            return "email".equals(endpoint);
        }
    
        @Override
        public ProducerRecord<String, Object> getRecord(Object obj) {
            return new ProducerRecord<>("tp-email.request", obj);
        }
    
        @Override
        public ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate() {
            return processingTransactionEmailReplyKafkaTemplate;
        }
    }
    
    

    短信发件人:

    package com.example.demo.service.sender;
    
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class SmsSender extends BaseSender{
    
        private final ReplyingKafkaTemplate<String, Object, Object> processingTransactionSmsReplyKafkaTemplate;
    
        public SmsSender(ReplyingKafkaTemplate<String, Object, Object> processingTransactionSmsReplyKafkaTemplate) {
            this.processingTransactionSmsReplyKafkaTemplate = processingTransactionSmsReplyKafkaTemplate;
        }
    
        @Override
        public boolean supports(String endpoint) {
            return "sms".equals(endpoint);
        }
    
        @Override
        public ProducerRecord<String, Object> getRecord(Object obj) {
            return new ProducerRecord<>("tp-sms.request", obj);
        }
    
        @Override
        public ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate() {
            return processingTransactionSmsReplyKafkaTemplate;
        }
    }
    
    

    网络发件人:

    package com.example.demo.service.sender;
    
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class NetworkSender extends BaseSender{
    
        private final ReplyingKafkaTemplate<String, Object, Object> processingTransactionNetworkReplyKafkaTemplate;
    
        public NetworkSender(ReplyingKafkaTemplate<String, Object, Object> processingTransactionNetworkReplyKafkaTemplate) {
            this.processingTransactionNetworkReplyKafkaTemplate = processingTransactionNetworkReplyKafkaTemplate;
        }
    
        @Override
        public boolean supports(String endpoint) {
            return "network".equals(endpoint);
        }
    
        @Override
        public ProducerRecord<String, Object> getRecord(Object obj) {
            return new ProducerRecord<>("tp-network.request", obj);
        }
    
        @Override
        public ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate() {
            return processingTransactionNetworkReplyKafkaTemplate;
        }
    }
    
    

    【讨论】:

      【解决方案3】:

      应用 KISS 但不是那么干燥... 将每个案例中的每个代码块放入括号中

      case "email": {
      ...
      }
      break;
      ...
      

      这样做可以减少案例的范围,然后可以重用相同的变量名。

      【讨论】:

      • 为什么?它会起作用,你减少案例的范围,然后你可以重用相同的变量名
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-10-13
      • 2019-02-27
      • 1970-01-01
      • 1970-01-01
      • 2019-02-14
      • 2020-08-18
      • 1970-01-01
      相关资源
      最近更新 更多