【问题标题】:Kafka producer with spring kafka template factory implementationKafka 生产者与 spring kafka 模板工厂实现
【发布时间】:2021-04-23 17:38:43
【问题描述】:

我有一个简单的 rest api(方法 1),它使用 kafka-clients api 生成消息发送到 kafka 集群。

Spring boot rest -> producer.send (kafka-clients lib) -> kafka 集群

此外,我还有另一个实现(方法 2)

Spring boot rest -> producer factory实现(单个配置spring对象)-> kafka模板发送(spring-kafka)-> kafka集群

我观察到方法 2 比方法 1 花费更多时间。例如,方法 1 用于单个消息花费 40 毫秒,方法 2 花费近 100 毫秒。

我想尽量减少使用基于生产者工厂的实现来推送消息所花费的时间。关于如何调整它的任何想法?

实现细节如下:(生产者工厂)

@Configuration
public class KafkaConfig {

    @Value("${bootstrap.servers}")
    String bootStrapServers;

    @Bean
    public Map<String,Object> configs(){
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put("bootstrap.servers", bootStrapServers);
        properties.put("acks", "0");
        properties.put("retries", 0);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }
    
    @Bean
    public ProducerFactory<String,String> factory(){
        return new DefaultKafkaProducerFactory<>(configs());
    }
    
    @Bean
    public KafkaTemplate<String,String> template(){
        return new KafkaTemplate<>(factory());
    }
}

Controller : 

    @Autowired
    private KafkaTemplate<String,String> template;

    public ResponseEntity<String> producer(@PathVariable String topicName, @RequestBody String requestBody) throws JsonProcessingException {

        try {
             template.send(topicName,requestBody);
        } catch (Exception ex) {
            logger.error(ex);
        } finally {
        }
        
        return ResponseEntity.ok().build();
    }

【问题讨论】:

  • 你能显示自定义生产者工厂配置属性吗?
  • @Deadpool properties.put("bootstrap.servers", bootStrapServers); properties.put("acks", "0"); properties.put("重试", 0); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  • @Deadpool - 我使用了生产者配置的默认值。没有在配置属性中添加任何内容。
  • 显示您的代码和配置(编辑问题,而不是在评论中)。 KafkaTemplate 是围绕 KafkaProducer 的非常轻量级的包装器,因此不应增加太多开销。
  • @GaryRussell 代码添加到问题中。

标签: spring apache-kafka spring-kafka


【解决方案1】:

我确实看到比预期更多的开销(类似于您的结果)。我会做一些分析,看看是否可以改进。

框架总是会增加一些开销,但最重要的是,与所有 Spring 项目一样,如果需要,您仍然可以下拉到较低级别的 API。

@SpringBootApplication
public class So65791199Application {

    public static void main(String[] args) {
        SpringApplication.run(So65791199Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template,
            ProducerFactory<String, String> pf) {

        return args -> {
            StopWatch watch = new StopWatch();
            ListenableFuture<SendResult<String, String>> future = template.send("so65791199", "foo");
            future.get(10, TimeUnit.SECONDS);
            List<ListenableFuture<SendResult<String, String>>> futures = new LinkedList<>();
            watch.start("template");
            IntStream.range(0, 10000).forEach(i -> {
                futures.add(template.send("so65791199", "foo"));
            });
            for (ListenableFuture<SendResult<String, String>> fut : futures) {
                fut.get(10, TimeUnit.SECONDS);
            }
            watch.stop();

            Producer<String, String> producer = new KafkaProducer<>(pf.getConfigurationProperties());
            ProducerRecord<String, String> pr = new ProducerRecord<>("so65791199", 0, null, "foo");
            Future<RecordMetadata> fut = producer.send(pr);
            fut.get(10, TimeUnit.SECONDS);
            watch.start("raw producer");
            List<Future<RecordMetadata>> futs = new LinkedList<>();
            IntStream.range(0, 10000).forEach(i -> {
                futs.add(producer.send(new ProducerRecord<>("so65791199", 0, null, "foo")));

            });
            for (Future<RecordMetadata> futr : futs) {
                futr.get(10, TimeUnit.SECONDS);
            }
            watch.stop();
            producer.close();
            System.out.println(watch.prettyPrint());
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so65791199").partitions(1).replicas(1).build();
    }

}
StopWatch '': running time = 126595537 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
088742103  070%  template
037853434  030%  raw producer

【讨论】:

  • 感谢您的分析
猜你喜欢
  • 1970-01-01
  • 2021-03-06
  • 2019-05-09
  • 1970-01-01
  • 2020-05-19
  • 1970-01-01
  • 2018-11-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多