【问题标题】:Kafka Reply Time out when applications are auto scaled in PCF在 PCF 中自动缩放应用程序时 Kafka 回复超时
【发布时间】:2020-08-26 19:24:38
【问题描述】:

我正在使用 ReplyingKafkaTemplate 进行 Kafka 同步响应,并且仅在实例运行一次时才能获得响应。但是,如果应用程序扩展到多个实例,我会收到超时错误。

来自文档

配置单个回复主题时,每个实例必须使用不同的 group.id。在这种情况下,所有实例都会收到每个回复。

根据文档,如果我们需要使用不同的消费者组,这是否意味着我们需要手动运行具有不同消费者组的实例?如果我们使用 PCF 之类的工具,我们如何处理自动缩放。以下是我的 kafka 配置。

@Configuration
@EnableKafka
public class KafkaConfig {

    //My Properties


    @Bean
    public Map < String, Object > producerConfig() {
        Map < String, Object > props = new HashMap < > ();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public Map < String, Object > consumerConfig() {
        Map < String, Object > props = new HashMap < > ();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval);
        return props;
    }

    @Bean
    public ProducerFactory < String, String > producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfig());
    }

    @Bean
    public ConsumerFactory < String, String > consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfig());
    }

    @Bean
    public KafkaTemplate < String, String > kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }

    @Bean
    public ReplyingKafkaTemplate < String, String, String > replyingKafkaTemplate
                            (ProducerFactory < String, String > pf, KafkaMessageListenerContainer < String, String > container) {
        ReplyingKafkaTemplate < String, String, String > rkt = new ReplyingKafkaTemplate(pf, container);
        rkt.setDefaultReplyTimeout(Duration.ofMillis(slaTime));
        rkt.setSharedReplyTopic(true);
        return rkt;
    }

    @Bean
    public KafkaMessageListenerContainer < String, String > replyContainer(ConsumerFactory < String, String > cf) {
        ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
        containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

        return new KafkaMessageListenerContainer < > (cf, containerProperties);
    }

    @Bean
    public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, String >> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory < String, String > factory = new ConcurrentKafkaListenerContainerFactory < > ();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(replyingKafkaTemplate(producerFactory(), replyContainer(consumerFactory())));
        return factory;
    }
}

【问题讨论】:

    标签: spring-boot apache-kafka spring-kafka pcfdev


    【解决方案1】:

    replyContainer bean 中,添加

    containerProperties.setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    containerProperties.setKafkaConsumerProperties(props);
    

    replyingKafkaTemplate中,添加

    rkt.setSharedReplyTopic(true);
    

    请求主题至少需要与最大横向扩展一样多的分区。 回复主题可以有任意数量的分区(包括 1 个)。

    使用 PCF,您可以使用 instanceIndex 构造 groupId,而不是使其随机化。

    您也可以使用instanceIndex 作为REPLY_PARTITION 标头并使用固定回复分区;在这种情况下,您至少需要与您期望使用的最大 instanceIndex 一样多的分区。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-10-20
      • 2018-08-22
      • 1970-01-01
      • 1970-01-01
      • 2022-11-30
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多