【问题标题】:spring integration kafka outbound adapter producer channel update metadataspring 集成 kafka 出站适配器生产者通道更新元数据
【发布时间】:2020-04-26 00:01:16
【问题描述】:
    <int-kafka:outbound-channel-adapter sync="false" 
        kafka-template="kafkaTemplate" id="kafkaOutboundChannelAdapter" 
        topic="learning-topic" channel="KafkaAdapterChan" 
        send-failure-channel="FailureChan">
    </int-kafka:outbound-channel-adapter>

    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
                <constructor-arg>
                    <map>
                        <entry key="bootstrap.servers" value="xyz:9092" />
                        ... <!-- more producer properties -->
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>
    </bean>

以上示例适用于快乐路径。但是,当 kafka 集群重启时,我看到重启失败后的第一条消息,并显示错误“不是领导者”...请求更新元数据。

一旦重启后第一条消息的元数据得到更新,后面的消息就会正确发布。所以我正在寻找是否有任何方法可以在 kafka 集群重启时重新实例化或更新 kakfa 生产者出站通道适配器的元数据?

【问题讨论】:

  • 什么版本的Kafka,Spring集成Kafka?请显示一些日志;我认为当前版本没有此类问题;我每分钟发送一次消息;重启代理;在日志中看到一些错误Broker may not be available.;代理出现时出现一些错误Error while fetching metadata,但下一次发送成功。
  • Confluent kafka 5.1.1,spring-integration-kafka: 3.1.3.RELEASE.
  • 尝试使用更新版本的 Confluent - 我使用 Kafka 2.4.1 进行了测试(我认为应该是 Confluent 5.4.1)。我也使用了 SIK 3.2.1。
  • Confluent kafka 5.1.1,spring-integration-kafka:3.1.3.RELEASE。日志:[kafka-producer-network-thread | producer -1]:由于 org.apache.kafka.common.errors.NotLeaderForPartitionException,在分区 **** 上的生产请求中收到无效的元数据错误。此服务器不是该主题分区的领导者。现在将请求元数据更新。
  • 所以这是我遵循的步骤。 1. 使用 producer1 应用成功生成消息。 2.停止kafka集群。 3. 使用 producer1 应用程序查看错误。 4.启动kafka集群。 5.在消息发布之前 producer1 应用中不再出现错误。 6. 只是为了确保 kafka 集群是否完全启动,创建新的 producer2 应用程序并成功发布。 7. 但我仍然看到来自 producer1 应用程序的第一条消息的此错误,后来每条消息都被正确发布。

标签: spring-integration spring-kafka


【解决方案1】:

我有一种方法可以处理此异常并重试,但正在寻找更好的方法来处理此错误。

您是否尝试过设置ProducerConfig.RETRIES_CONFIG

您还可以将Retry Advice 添加到适配器,框架将执行重试。

您可以在适配器上设置sync="true",以便在调用线程上引发异常,并且重试建议将执行重试。

否则,您需要在 FailureChan 上使用自己的重试逻辑。

【讨论】:

  • 如果错误以异步方式返回,您需要sync=true 才能使用重试建议。
  • 谢谢你会尝试上述选项,也只是检查是否有办法手动触发刷新kafka生产者连接或触发元数据更新,可能会在jmx中公开该方法(如果存在)?
  • 不建议对大容量应用进行同步。你可以在DefaultKafkaProducerFactory 上调用reset()(如果你知道集群被弹回了)。下一次发送将使用新的Producer
  • 谢谢 Gary,我有一个健康检查代码配置为知道 kafka 被退回。所以我会尝试这个重置选项。
猜你喜欢
  • 2015-07-11
  • 1970-01-01
  • 2014-12-07
  • 2023-03-21
  • 2013-04-23
  • 2014-09-19
  • 1970-01-01
  • 2011-08-29
  • 1970-01-01
相关资源
最近更新 更多