【发布时间】:2020-04-26 00:01:16
【问题描述】:
<int-kafka:outbound-channel-adapter sync="false"
kafka-template="kafkaTemplate" id="kafkaOutboundChannelAdapter"
topic="learning-topic" channel="KafkaAdapterChan"
send-failure-channel="FailureChan">
</int-kafka:outbound-channel-adapter>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="xyz:9092" />
... <!-- more producer properties -->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
以上示例适用于快乐路径。但是,当 kafka 集群重启时,我看到重启失败后的第一条消息,并显示错误“不是领导者”...请求更新元数据。
一旦重启后第一条消息的元数据得到更新,后面的消息就会正确发布。所以我正在寻找是否有任何方法可以在 kafka 集群重启时重新实例化或更新 kakfa 生产者出站通道适配器的元数据?
【问题讨论】:
-
什么版本的Kafka,Spring集成Kafka?请显示一些日志;我认为当前版本没有此类问题;我每分钟发送一次消息;重启代理;在日志中看到一些错误
Broker may not be available.;代理出现时出现一些错误Error while fetching metadata,但下一次发送成功。 -
Confluent kafka 5.1.1,spring-integration-kafka: 3.1.3.RELEASE.
-
尝试使用更新版本的 Confluent - 我使用 Kafka 2.4.1 进行了测试(我认为应该是 Confluent 5.4.1)。我也使用了 SIK 3.2.1。
-
Confluent kafka 5.1.1,spring-integration-kafka:3.1.3.RELEASE。日志:[kafka-producer-network-thread | producer -1]:由于 org.apache.kafka.common.errors.NotLeaderForPartitionException,在分区 **** 上的生产请求中收到无效的元数据错误。此服务器不是该主题分区的领导者。现在将请求元数据更新。
-
所以这是我遵循的步骤。 1. 使用 producer1 应用成功生成消息。 2.停止kafka集群。 3. 使用 producer1 应用程序查看错误。 4.启动kafka集群。 5.在消息发布之前 producer1 应用中不再出现错误。 6. 只是为了确保 kafka 集群是否完全启动,创建新的 producer2 应用程序并成功发布。 7. 但我仍然看到来自 producer1 应用程序的第一条消息的此错误,后来每条消息都被正确发布。
标签: spring-integration spring-kafka