【问题标题】:How can we enable log compaction using spring cloud stream?我们如何使用 Spring Cloud Stream 启用日志压缩?
【发布时间】:2018-11-14 05:06:09
【问题描述】:

如何设置 log.cleanup.policy:使用 spring 配置紧凑

【问题讨论】:

    标签: kafka-consumer-api kafka-producer-api spring-cloud-stream spring-kafka


    【解决方案1】:

    此属性是代理配置:http://kafka.apache.org/documentation/#brokerconfigs。因此必须在代理端进行配置。从 Spring Cloud Stream Kafka Binder 的角度来看,没有什么可做的。它只是现有 Apache Kafka 代理的客户端。

    如果从 Spring Kafka 的角度谈KafkaEmbedded,有以下几种选择:

    /**
     * Specify the properties to configure Kafka Broker before start, e.g.
     * {@code auto.create.topics.enable}, {@code transaction.state.log.replication.factor} etc.
     * @param brokerProperties the properties to use for configuring Kafka Broker(s).
     * @return this for chaining configuration
     * @see KafkaConfig
     */
    public KafkaEmbedded brokerProperties(Map<String, String> brokerProperties) {
        this.brokerProperties.putAll(brokerProperties);
        return this;
    }
    
    /**
     * Specify a broker property.
     * @param property the property name.
     * @param value the value.
     * @return the {@link KafkaEmbedded}.
     * @since 2.1.4
     */
    public KafkaEmbedded brokerProperty(String property, Object value) {
        this.brokerProperties.put(property, value);
        return this;
    }
    

    【讨论】:

      【解决方案2】:

      log.cleanup.policy 是代理配置(在 server.properties 中),而不是客户端属性。

      更改单个主题的策略

      kafka-topics --zookeeper localhost:2181 --alter --topic myTopic --config cleanup.policy=compact
      

      kafka-configs --zookeeper localhost:2181 --entity-type=topics --entity-name=mytopic --alter --add-config cleanup.policy=compact
      

      (因为第一个已弃用)

      WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
           Going forward, please use kafka-configs.sh for this functionality
      

      【讨论】:

        【解决方案3】:

        根据以下 Gary 和 Artem 的答案更新此答案,以避免任何混淆。

        您可以使用密钥 spring.cloud.stream.kafka.binder.configuration... 传递任意 kafka 客户端配置。但是,由于log.cleanup.policy 是代理级别属性,因此您不能从活页夹中以这种方式使用它。您需要在代理上设置它。请参阅下面的答案以获取更多信息。

        【讨论】:

          猜你喜欢
          • 2021-06-07
          • 2016-02-08
          • 2019-03-27
          • 1970-01-01
          • 2012-05-22
          • 2020-03-24
          • 1970-01-01
          • 2023-03-13
          • 2020-09-23
          相关资源
          最近更新 更多