【问题标题】:How to AutoCreate Kafka Topic according to incoming topic pattern through kafkalistener service in springboot Kafka?如何通过springboot Kafka中的kafkalistener服务根据传入的主题模式自动创建Kafka主题?
【发布时间】:2022-01-15 05:13:20
【问题描述】:

我将订阅 kafka 主题模式,例如“topic。*”我的目标是为我收听的每个 kafka 主题创建死信队列。

例如,当我收听名为“topic.1”的主题时,我想自动创建名为“topic.1_deadletter”的死信队列。

到目前为止,我尝试做的事情如下:

我的消费者:

@Component
@Slf4j
public class LibraryEventsConsumer {

    @Autowired
    LibraryEventConsumerConfig libraryEventConsumerConfig;

    @KafkaListener(topicPattern = "kafka.*")
    public void onMessage(String consumerRecord, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws Exception{

        log.info("ConsumerRecord : {}", consumerRecord);

        String deadlettertopic = String.format("%s_deadletter",topic);
        System.out.println(deadlettertopic);
        System.out.println(KafkaHeaders.RECEIVED_TOPIC);

        libraryEventConsumerConfig.getTopic(topic);`

在这里,我尝试使用 getTopic 方法自动创建 kafka 主题。在下面你可以看到 libraryEventConsumer 类:

@Configuration
@EnableKafka
public class LibraryEventConsumerConfig {

    @Bean
    public void getTopic(String topic){
        NewTopic deadlettertopic = TopicBuilder.name(String.format("%s_deadletter",topic))
                .partitions(1)
                .replicas(1)
                .build();
    }
}

不幸的是,这种方法不起作用,我收到以下错误消息:

Parameter 0 of method getTopic in com.kafkalibrary.Config.LibraryEventConsumerConfig required a bean of type 'java.lang.String' that could not be found.

知道如何进行吗?

解决方案示例代码:

对于那些正在寻找相同目标的人,这是我的示例代码:感谢 Gary Russell 的启发。

   private static void createTopic(String topicName, int numPartitions) throws Exception {
    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:5052,localhost:5053,localhost:5054");
    AdminClient admin = AdminClient.create(config);

    //checking if topic already exists
    boolean alreadyExists = admin.listTopics().names().get().stream()
            .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
    if (alreadyExists) {
        System.out.printf("topic already exits: %s%n", topicName);
    } else {
        //creating new topic
        System.out.printf("creating topic: %s%n", topicName);
        NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
        admin.createTopics(Collections.singleton(newTopic)).all().get();
    }

【问题讨论】:

  • bean从spring config创建一个主题,它不应该采用动态字符串参数。
  • 我明白,这种目标的任何解决方法? @OneCricketeer
  • 您可以使用重新平衡侦听器(或ConsumerSeekAware) - 然后使用AdminClient 检查 DLT 是否存在,如果不存在,则创建它。

标签: java spring-boot apache-kafka spring-kafka


【解决方案1】:

添加一个重新平衡侦听器,或扩展AbstractConsumerSeekAware(或仅实现ConsumerSeekAware)。

public class LibraryEventsConsumer extends AbstractConsumerSeekAware {

然后,在 onPartitionsAssigned() 中使用 AdminClient 检查 DLT 主题是否存在,如果不存在,则创建它。

【讨论】:

  • 使用 AdminClient 我解决了我的问题。
猜你喜欢
  • 1970-01-01
  • 2015-01-18
  • 1970-01-01
  • 2021-09-09
  • 1970-01-01
  • 1970-01-01
  • 2017-10-18
  • 2020-01-22
相关资源
最近更新 更多