【发布时间】:2018-11-14 05:06:09
【问题描述】:
如何设置 log.cleanup.policy:使用 spring 配置紧凑
【问题讨论】:
标签: kafka-consumer-api kafka-producer-api spring-cloud-stream spring-kafka
如何设置 log.cleanup.policy:使用 spring 配置紧凑
【问题讨论】:
标签: kafka-consumer-api kafka-producer-api spring-cloud-stream spring-kafka
此属性是代理配置:http://kafka.apache.org/documentation/#brokerconfigs。因此必须在代理端进行配置。从 Spring Cloud Stream Kafka Binder 的角度来看,没有什么可做的。它只是现有 Apache Kafka 代理的客户端。
如果从 Spring Kafka 的角度谈KafkaEmbedded,有以下几种选择:
/**
* Specify the properties to configure Kafka Broker before start, e.g.
* {@code auto.create.topics.enable}, {@code transaction.state.log.replication.factor} etc.
* @param brokerProperties the properties to use for configuring Kafka Broker(s).
* @return this for chaining configuration
* @see KafkaConfig
*/
public KafkaEmbedded brokerProperties(Map<String, String> brokerProperties) {
this.brokerProperties.putAll(brokerProperties);
return this;
}
/**
* Specify a broker property.
* @param property the property name.
* @param value the value.
* @return the {@link KafkaEmbedded}.
* @since 2.1.4
*/
public KafkaEmbedded brokerProperty(String property, Object value) {
this.brokerProperties.put(property, value);
return this;
}
【讨论】:
log.cleanup.policy 是代理配置(在 server.properties 中),而不是客户端属性。
更改单个主题的策略
kafka-topics --zookeeper localhost:2181 --alter --topic myTopic --config cleanup.policy=compact
或
kafka-configs --zookeeper localhost:2181 --entity-type=topics --entity-name=mytopic --alter --add-config cleanup.policy=compact
(因为第一个已弃用)
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
【讨论】:
根据以下 Gary 和 Artem 的答案更新此答案,以避免任何混淆。
您可以使用密钥 spring.cloud.stream.kafka.binder.configuration... 传递任意 kafka 客户端配置。但是,由于log.cleanup.policy 是代理级别属性,因此您不能从活页夹中以这种方式使用它。您需要在代理上设置它。请参阅下面的答案以获取更多信息。
【讨论】: