【问题标题】:How to notify consumer that new topic has been created in Kafka?如何通知消费者在 Kafka 中创建了新主题?
【发布时间】:2017-10-10 23:34:02
【问题描述】:

我正在尝试让我的消费者动态更新其消费。

让我给你一个使用动物的更具体的例子。想象一下,我有一家宠物店,每个主题都是一种动物(例如狗、猫、鱼)。我的 Kafka 消费者的主要职责是获取我们在 Kafka 中拥有的任何日志/记录/消息并将它们存储到数据库中。

假设我的消费者正在积极消费dogscats 主题并且一切正常,现在有一种新型动物进入商店,并且在 Kafka 集群中生成了一个新主题。如何通知我的消费者添加了新主题?

我有两个建议,我想看看你认为哪个更好?或者如果有更好的第三个选项,请告诉我。

1.) 生产者向消费者发送一个http请求,让消费者知道生产者将要创建一个新主题,以便消费者采取相应的行动。这种方法的问题在于,存在竞争条件。消费者有可能会在创建主题之前尝试消费。 (实际上我刚刚发现,如果我将 auto.topic.creation.enable 设置为 true,那么竞态条件就不是问题了。)

2.) 在 Kafka 集群中创建一个名为 topic_updates 的额外主题。所以每当生产者成功地向 Kafka 集群提交了一条消息,它就会通过这个topic_updates 广播消息,也许一个简单的字符串就可以了。消费者正在积极收听此主题更新。

3.) 我不知道,理想情况下,我希望 Kafka 能够在创建新主题时发出事件。

提前谢谢你

【问题讨论】:

    标签: apache-kafka sarama


    【解决方案1】:

    消费者可以自动找到新创建的主题,您只需调用consumer.subscribe(Pattern.compile(".*"));即可订阅所有主题

    可以降低metadata.max.age.ms 以让消费者更快地了解新主题。

    【讨论】:

    • 这对我不起作用。正如我尝试过的kafka-console-consumer --bootstrap-server localhost:9092 --whitelist "mytopics.*" --consumer-property metadata.max.age.ms=5000kafka-console-producer --broker-list localhost:9092 --topic mytopics.1234。我仍然得到[2019-12-20 15:00:59,080] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {mytopics.1234=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
    【解决方案2】:

    您可以使用新的 KafkaAdminClient 并以某种方式监控主题列表并检查新添加的内容。这是一个示例代码,可为您提供主题列表(不包括内部主题):

    Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
    ListTopicsResult listTopicResult = kafkaAdminClient.listTopics();
    System.out.println(listTopicResult.names().get().toString());
    

    【讨论】:

    • vbnvbnvbn hgghghf h hg
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-12
    • 2020-06-03
    • 1970-01-01
    • 2020-04-02
    相关资源
    最近更新 更多