【问题标题】:Error using Kafkamessagedriven channel adapter使用 Kafkamessagedriven 通道适配器时出错
【发布时间】:2015-09-03 16:15:17
【问题描述】:

我正在使用 spring-integration kafka 与 kafka 交谈。

以下是我使用消息驱动通道适配器所做的配置。

<!-- kafka MessageDriven Channel Adapter for ProcessEvent -->
    <int-kafka:message-driven-channel-adapter
        listener-container="listnerContainer" payload-decoder="kafkaReflectionDecoder"
        key-decoder="kafkaReflectionDecoder" channel="storeOffsetsChannel"
        auto-startup="true"/>


    <bean id="zkConfiguration"
        class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
        <constructor-arg ref="zookeeperConnect"></constructor-arg>
    </bean>
    <bean id="kafkaConnectionFactory"
        class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
        <constructor-arg ref="zkConfiguration"></constructor-arg>
    </bean>
    <bean id="listnerContainer"
        class="org.springframework.integration.kafka.listener.KafkaMessageListenerContainer">
        <constructor-arg ref="kafkaConnectionFactory"></constructor-arg>
        <constructor-arg value="${listed.accounts.topic}"></constructor-arg>
    </bean>

<!-- Zookeeper connect needed for Kafka Consumer -->
    <int-kafka:zookeeper-connect id="zookeeperConnect"
        zk-connect="${app.zookeeper.servers}" zk-connection-timeout="6000"
        zk-session-timeout="6000" zk-sync-time="2000" />

当我启动我的应用程序时,应用程序不会启动并出现以下错误。但有时应用程序启动正常

2015-09-03 11:53:32.647 ERROR 28883 --- [           main] o.s.boot.SpringApplication               : Application startup failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter#0'; nested exception is kafka.common.KafkaException: fetching topic metadata for topics [Set(fulfillment.payments.autopay.listeddueaccounts)] from broker [List()] failed
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:51)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:770)
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.finishRefresh(EmbeddedWebApplicationContext.java:140)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:483)
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:118)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:686)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:320)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:957)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:946)
    at com.capitalone.payments.autopay.autopayprocesstransrecon.AutopayProcessTransactionRecon.main(AutopayProcessTransactionRecon.java:26)
Caused by: kafka.common.KafkaException: fetching topic metadata for topics [Set(fulfillment.payments.autopay.listeddueaccounts)] from broker [List()] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at org.springframework.integration.kafka.core.DefaultConnectionFactory.refreshMetadata(DefaultConnectionFactory.java:178)
    at org.springframework.integration.kafka.core.DefaultConnectionFactory.getPartitions(DefaultConnectionFactory.java:221)
    at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer$GetPartitionsForTopic.safeValueOf(KafkaMessageListenerContainer.java:611)
    at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer$GetPartitionsForTopic.safeValueOf(KafkaMessageListenerContainer.java:600)
    at com.gs.collections.impl.block.function.checked.CheckedFunction.valueOf(CheckedFunction.java:30)
    at com.gs.collections.impl.utility.ArrayIterate.flatCollect(ArrayIterate.java:933)
    at com.gs.collections.impl.utility.ArrayIterate.flatCollect(ArrayIterate.java:919)
    at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer.getPartitionsForTopics(KafkaMessageListenerContainer.java:332)
    at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer.start(KafkaMessageListenerContainer.java:294)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.doStart(KafkaMessageDrivenChannelAdapter.java:137)
    at org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:94)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173)
    ... 13 common frames omitted

我的消息驱动适配器配置错误吗?

还要补充一点,我在我的应用程序中配置了 int-kafka:inbound-channel-adapter,它查看不同的主题,该适配器的配置如下:

<int-kafka:inbound-channel-adapter
    id="kafkaInboundChannelAdapter" channel="domainEventChannel"
    kafka-consumer-context-ref="consumerContext" auto-startup="true">
    <int:poller fixed-delay="10" time-unit="MILLISECONDS"></int:poller>
</int-kafka:inbound-channel-adapter>

<int-kafka:consumer-context id="consumerContext"
    zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"
    consumer-timeout="1000">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
            group-id="autopayProcessTransactionRecon" value-decoder="kafkaReflectionDecoder"
            key-decoder="kafkaReflectionDecoder" max-messages="1">
            <int-kafka:topic streams="2" id="${domain.event.topic}" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>


<!-- Zookeeper connect needed for Kafka Consumer -->
<int-kafka:zookeeper-connect id="zookeeperConnect"
    zk-connect="${app.zookeeper.servers}" zk-connection-timeout="6000"
    zk-session-timeout="6000" zk-sync-time="2000" />

【问题讨论】:

    标签: spring-integration apache-kafka kafka-consumer-api


    【解决方案1】:

    基于

    经纪人 [List()]

    跟踪的一部分,看起来您没有配置主机/代理 ip。您可以考虑使用不同的 Configuration 实现作为 kafkaConnectionFactory 的构造函数 arg,例如

            <bean id="brokerConfiguration" class="org.springframework.integration.kafka.core.BrokerAddressListConfiguration">
                <constructor-arg>
                    <bean class="org.springframework.integration.kafka.core.BrokerAddress">
                        <constructor-arg type="java.lang.String" value="localhost"/>
                    </bean>
                </constructor-arg>
            </bean>
            <bean id="kafkaConnectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
                <constructor-arg ref="brokerConfiguration"></constructor-arg>
            </bean>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-05-28
      • 2018-07-06
      • 2016-07-19
      • 1970-01-01
      • 1970-01-01
      • 2014-12-04
      • 2015-03-10
      • 1970-01-01
      相关资源
      最近更新 更多