【问题标题】:How do I properly externalize spring-boot kafka-streams configuration in a properties file?如何在属性文件中正确外部化 spring-boot kafka-streams 配置?
【发布时间】:2019-06-13 00:52:15
【问题描述】:

我正在尝试将我目前用 Java 代码编写的 spring-kafka 应用程序的配置外部化。 我应该将ProducerConfigConsumerConfig 值放入spring.kafka.streams.properties,或者如果我通过spring.kafka.producerspring.kafka.consumer 提供它们,它们是否会正确配置?

到目前为止,我似乎应该将所有配置放入 KafkaStreamsConfiguration 类型的 bean 中,以便配置我的 kafka-streams 应用程序。目前,我通过直接在代码中设置 ProducerConfigConsumerConfig 值来做到这一点。

当我将此配置外部化时,似乎在application.properties 文件中设置ProducerConfigConsumerConfig 的属性值与它们在spring-boot 创建的KafkaStreamsConfiguration 中无关(我确认了这一点通过在某处自动装配配置并查看它)。

如果我改为通过spring.kafka.streams.properties 提供ProducerConfigConsumerConfig 值,它们会显示在KafkaStreamsConfiguration 中。

这是我的旧 Java 配置:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put("replication.factor", replicationFactor);
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
        props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "600000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new KafkaStreamsConfiguration(props);
    }

这会导致 ProducerConfigConsumerConfig 值在运行时不在 KafkaStreamsConfiguration 中:

spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.consumer.group-id=<group_id> #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.producer.compression-type=lz4 #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.consumer.auto-offset-reset=latest #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor

但是,这确实会导致 KafkaStreamsConfiguration 具有预期的值:

spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.streams.properties.group-id=<group_id> #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.streams.properties.compression-type=lz4 #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.streams.properties.auto-offset-reset=latest #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor

当分别通过spring.kafka.producerspring.kafka.consumer 设置时,我期望ProducerConfigConsumerConfig 值传播到KafkaStreamsConfiguration。特别是因为我在 IntelliJ 中为 application.properties 中的生产者和消费者配置获得了 Intellisense。

也就是说,我是否需要确保通过 spring.kafka.streams.properties 进行设置才能正确配置应用程序?

【问题讨论】:

    标签: spring-boot apache-kafka-streams spring-kafka


    【解决方案1】:

    spring.kafka.consumer.group-id=&lt;group_id&gt; #this won't show up in KafkaStreamsConfiguration

    Streams 将group.id 设置为application.id 属性。

    public static final String APPLICATION_ID_CONFIG = "application.id";

    private static final String APPLICATION_ID_DOC = "流处理应用程序的标识符。在Kafka集群内必须是唯一的。它用作1)默认客户端ID前缀,2)成员管理的组ID,3 ) 变更日志主题前缀。";

    KafkaProperties

    streamsproducerconsumer 属性不同且不相关。

    spring.kafka.producer.compression-type=lz4 #this won't show up in KafkaStreamsConfiguration

    compression-type 不作为流的一流引导属性公开。你可以设置它使用

    spring.kafka.streams.properties.compression.type=gzip
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-08-18
      • 1970-01-01
      • 1970-01-01
      • 2018-02-21
      • 1970-01-01
      • 2016-08-19
      • 1970-01-01
      • 2015-10-27
      相关资源
      最近更新 更多