【问题标题】:Kafka spout integration卡夫卡喷口集成
【发布时间】:2017-07-07 09:16:09
【问题描述】:

我正在使用 kafka 0.10.1.1 和storm 1.0.2。在 kafka 集成的storm文档中,我可以看到偏移量仍然使用zookeeper维护,因为我们正在使用zookeeper服务器初始化kafka spout。 我如何使用kafka服务器引导spout。有没有这方面的例子。 Storm 文档中的示例

    BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

这个使用 zookeeper 的选项工作正常并且正在使用消息。但我无法在 kafkamanager ui 中将消费者组或风暴节点视为消费者。

尝试过的替代方法是这样的。

KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig();

KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig);

 private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() {

        Map<String, Object> props = new HashMap<>();
        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, bootstrapServers);
        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID);
        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");

        String[] topics = new String[1];
        topics[0] = topicName;

        KafkaSpoutStreams kafkaSpoutStreams =
                new KafkaSpoutStreamsNamedTopics.Builder(new Fields("message"), topics).build();

        KafkaSpoutTuplesBuilder<String, String> tuplesBuilder =
                new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new TuplesBuilder(topicName)).build();

        KafkaSpoutConfig<String, String> spoutConf =
                new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, tuplesBuilder).build();

        return spoutConf;
    }

但是这个解决方案在从 kafka 读取几条消息后显示 CommitFailedException。

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    Storm-kafka 使用普通的 kafka 客户端在 zookeeper 中以不同的位置和不同的格式写入消费者信息。所以在kafkamanager ui中是看不到的。

    你可以找到一些其他的监控工具,比如 https://github.com/keenlabs/capillary.

    【讨论】:

      【解决方案2】:

      在您的替代方法中,您可能会收到CommitFailedException,原因是:

      props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
      

      直到 Storm 2.0.0-SNAPSHOT(以及自 1.0.6 起)- KafkaConsumer 自动提交不受支持

      来自文档:

      请注意,不支持 KafkaConsumer 自动提交。这 KafkaSpoutConfig 构造函数将抛出异常,如果 “enable.auto.commit”属性被设置,消费者使用的 spout 将始终将该属性设置为 false。你可以配置 通过 setProcessingGuarantee 自动提交的类似行为 KafkaSpoutConfig 构建器上的方法。

      参考文献:

      【讨论】:

        猜你喜欢
        • 2016-09-06
        • 1970-01-01
        • 2015-06-18
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-04-11
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多