【问题标题】:Camel-Kafka Zookeeper ExceptionCamel-Kafka Zookeeper 异常
【发布时间】:2017-05-17 12:16:35
【问题描述】:

我正在启动一个骆驼卡夫卡消费者,它从主题中获取所有数据并移动到哈希图。 camel-Kafka 的 Jar 版本是 2.18.1

cxfbeans.xml文件如下:

    <route id="Test1" streamCache="true">
        <from uri="file:C:/data" />
        <split streaming="true">
            <tokenize token="\n" />
            <to uri="bean:proc1" />
            <to
                uri="kafka:localhost:9092?topic=Checking&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;serializerClass=kafka.serializer.StringEncoder" />
        </split>
    </route>

如果我部署战争,我会得到以下异常:

原因:org.apache.camel.ResolveEndpointFailedException:无法解析端点:kafka://localhost:9092?serializerClass=kafka.serializer.StringEncoder&topic=Checking&zookeeperHost=localhost&zookeeperPort=2181 由于:有2个参数无法在端点上设置。如果参数拼写正确并且它们是端点的属性,请检查 uri。未知参数=[{zookeeperHost=localhost,zookeeperPort=2181}]

我尝试删除 zookeeper 的端口和主机,它已部署,但消费者没有消费并放置在处理器中。

谁能帮我解决这个问题。我将版本降级为,但我需要在 to 标记中指定一个反序列化器类。

【问题讨论】:

    标签: apache-camel apache-kafka


    【解决方案1】:

    使用 camel-kafka 时,版本 =2.17 存在差异。

    你用的是适合2.16的。

    对于 >=2.17,请参阅 http://camel.apache.org/kafka.html 2.17 或更新的部分。

    这里有一个例子:

    from("direct:start").process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class);
                            exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
                            exchange.getIn().setHeader(KafkaConstants.KEY, "1");
                        }
                    }).to("kafka:localhost:9092?topic=test");
    

    【讨论】:

    • @Mobilty,这是否意味着,如果我使用最新的骆驼,不需要 zookeeper 主机和端口参数。骆驼怎么会知道对应的zookeeper信息?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-13
    • 2019-01-12
    • 2017-09-12
    相关资源
    最近更新 更多