【问题标题】:How to configure kafka topic retention policy during creation with Spring?如何在使用 Spring 创建期间配置 kafka 主题保留策略?
【发布时间】:2019-10-25 14:50:16
【问题描述】:

我需要在创建过程中配置特定主题的保留策略。我试图寻找解决方案我只能找到如下命令级别的更改命令

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=1680000

谁能告诉我在创建过程中配置它的方法,比如spring-mvc中的xml或属性文件配置。

【问题讨论】:

    标签: java spring spring-boot spring-mvc apache-kafka


    【解决方案1】:

    我想您可以为此使用管理客户端 (https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html)。您可以在应用程序中创建管理客户端实例,并使用 create 或 alter topic 命令来操作主题配置,包括保留。

    【讨论】:

      【解决方案2】:

      Spring Kafka 允许您通过在应用程序上下文中声明 @Beans 来创建新主题。这将需要应用程序上下文中的 KafkaAdmin 类型的 bean,如果使用 Spring Boot,它将自动创建。您可以按如下方式定义您的主题:

      @Bean
      public NewTopic myTopic() {
          return TopicBuilder.name("my-topic")
                  .partitions(4)
                  .replicas(3)
                  .config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
                  .build();
      }
      

      如果您不使用 Spring Boot,则还必须定义 KafkaAdmin bean:

      @Bean
      public KafkaAdmin admin() {
          Map<String, Object> configs = new HashMap<>();
          configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
          return new KafkaAdmin(configs);
      }
      

      如果要编辑现有主题的配置,则必须使用AdminClient,这是在主题级别更改retention.ms 的sn-p:

      Map<String, Object> config = new HashMap<>();                
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
                               
      AdminClient client = AdminClient.create(config);
                               
      ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
                  
      // Update the retention.ms value
      ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
      Map<ConfigResource, Config> updateConfig = new HashMap<>();
      updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
      
      AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
      Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1);
      configs.put(resource, Arrays.asList(op));
      
      AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);
              alterConfigsResult.all();
      

      可以使用这个@PostConstruct 方法自动设置配置,该方法接收NewTopic bean。

      
          @Autowired
          private Set<NewTopic> topics;
      
          @PostConstruct
          public void reconfigureTopics() throws ExecutionException, InterruptedException {
      
              try (final AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {
                  adminClient.incrementalAlterConfigs(topics.stream()
                      .filter(topic -> topic.configs() != null)
                      .collect(Collectors.toMap(
                          topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic.name()),
                          topic -> topic.configs().entrySet()
                              .stream()
                              .map(e -> new ConfigEntry(e.getKey(), e.getValue()))
                              .peek(ce -> log.debug("configuring {} {} = {}", topic.name(), ce.name(), ce.value()))
                              .map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET))
                              .collect(Collectors.toList())
                      )))
                      .all()
                      .get();
              }
      
          }
      

      【讨论】:

      • 感谢您提供示例代码。 @Sergi 如果我们尝试更改不存在主题的保留,alterConfigsResult.all() 是否会引发任何异常?我们如何知道我们是否正在修改现有主题?
      • 更新现有主题的配置不会引发任何异常。您可以使用 describeConfigs 方法获取现有主题的当前配置
      【解决方案3】:

      要使用AdminClient 以编程方式创建具有指定保留时间的主题,请执行以下操作:

      NewTopic topic = new NewTopic(topicName, numPartitions, replicationFactor);
      topic.configs(Map.of(TopicConfig.RETENTION_MS_CONFIG, retentionMs.toString()));
      adminClient.createTopics(List.of(topic));
      

      【讨论】:

        猜你喜欢
        • 2020-01-14
        • 1970-01-01
        • 2022-12-10
        • 2020-09-29
        • 1970-01-01
        • 2021-01-16
        • 1970-01-01
        • 1970-01-01
        • 2019-08-15
        相关资源
        最近更新 更多