【问题标题】:Sending message to Kafka on dynamically created topic gives error LEADER_NOT_AVAILABLE在动态创建的主题上向 Kafka 发送消息会导致错误 LEADER_NOT_AVAILABLE
【发布时间】:2018-02-13 09:51:42
【问题描述】:

我正在尝试使用以下代码在动态创建的主题上发送消息:

@Service
public class KafkaMessageSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageSender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        String topicName =  topic + Integer.valueOf(RandomStringUtils.randomNumeric(1, 4))%100;
        kafkaTemplate.send(topicName, payload);
        LOGGER.info("sent payload to topic='{}'", topicName);
    }
}

当我用 topic = 'somethingnew' 调用这个 send 方法时,我得到以下错误,(有趣的是它能够发送一些消息,但对于许多人来说它给出了错误,并且在一段时间后,它开始发送消息关于这些主题)但是当我重新启动服务器并使用 topic = 'somethingnew' 再次调用 send 方法时,它工作正常!卡夫卡有一些潜在的错误吗?我使用的是 Spring Boot 版本:1.5.4.RELEASE。

2018-02-13 09:52:35.927  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
2018-02-13 09:52:35.927  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
2018-02-13 09:52:36.067  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 0 : {betsyncDataTopicNew123.5=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:36.272  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.5'
2018-02-13 09:52:36.372  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {betsyncDataTopicNew123.93=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:36.475  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.93'
2018-02-13 09:52:36.583  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 6 : {betsyncDataTopicNew123.4=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:36.686  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.4'
2018-02-13 09:52:36.794  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 9 : {betsyncDataTopicNew123.7=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:36.897  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.7'
2018-02-13 09:52:37.008  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 12 : {betsyncDataTopicNew123.0=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:37.110  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.0'
2018-02-13 09:52:37.220  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 15 : {betsyncDataTopicNew123.1=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:37.320  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.1'
2018-02-13 09:52:37.431  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 18 : {betsyncDataTopicNew123.10=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:37.534  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.10'
2018-02-13 09:52:37.534  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.4'
2018-02-13 09:52:37.642  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 22 : {betsyncDataTopicNew123.71=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:37.747  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.71'
2018-02-13 09:52:37.859  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 25 : {betsyncDataTopicNew123.2=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:37.962  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.2'
2018-02-13 09:52:37.963  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.7'
2018-02-13 09:52:38.074  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 29 : {betsyncDataTopicNew123.33=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:38.176  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.33'
2018-02-13 09:52:38.176  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.1'
2018-02-13 09:52:38.285  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 33 : {betsyncDataTopicNew123.17=LEADER_NOT_AVAILABLE}

这只是警告,我们没有收到任何异常,所以不知道消息是否丢失。

【问题讨论】:

    标签: java spring spring-boot spring-kafka


    【解决方案1】:

    使用AdminClient 按需创建主题。

    Spring for Apache Kafka 提供了一个方便的KafkaAdmin,它可以在应用程序上下文中为NewTopic bean 创建主题,但它也可以用于创建AdminClient,因此您可以手动创建主题-docs here。 Spring Boot 2.0 自动配置为你注册一个。

    一旦你有一个AdminClient 实例,使用createTopics() 方法之一。有关更多信息,请参阅AdminClient javadocs。请务必等待CreateTopicsResultFuture&lt;?&gt;s 完成。

    【讨论】:

    【解决方案2】:

    错误Error while fetching metadata with correlation id 0 ... LEADER_NOT_AVAILABLE 是因为发布第一条消息时的topic doesn't exist,如果您的配置允许自动创建主题,则会自动创建。

    上述消息指的是 Kafka 无法获取有关主题、其分区和消费者组的元数据(例如,无法找到每个消费者组的每个分区的偏移量)。

    正如您所观察到的,发送到主题的第二条和后续消息不会引发此错误。

    创建具有随机名称的主题是相当不寻常的,因为消费者需要“找到”主题才能从中消费。总体思路是在使用之前预先创建 Kafka 主题,这将允许您配置分区等重要因素。

    【讨论】:

    • 嗨@StuartLC,我需要为每个feedId动态创建主题并在侦听器端使用topicPattern,我也可以收听动态创建的主题。主要原因是在多个监听器中,一个监听器会接收到一个feed的所有消息,所以我们可以进行负载均衡,使处理互斥。
    • 我目前无法测试的是,我是否丢失了之前的消息。我现在正在测试它。
    猜你喜欢
    • 2019-02-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-13
    • 2017-12-15
    • 2019-02-14
    相关资源
    最近更新 更多