【问题标题】:How can I emit the result of storm botl to kafka brokers using kafkabolt?如何使用 kafkabolt 将风暴 botl 的结果发送给 kafka 经纪人?
【发布时间】:2018-04-15 22:15:17
【问题描述】:

我正在尝试更改storm中的wordcount示例:我不想显示结果,而是想将其发送到kafka集群。 这是构建拓扑的代码:

 Config config = new Config();
    config.setDebug(true);
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//set producer properties.
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "1");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("kafka.broker.config", props);
KafkaBolt bolt = new KafkaBolt()
            .withTopicSelector(new DefaultTopicSelector("tt"))
            .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper())
                        .withProducerProperties(props);

    BrokerHosts hosts = new ZkHosts("localhost:2181");
    SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "", "id1");
    spoutConfig.scheme = new SchemeAsMultiScheme(new KafkaBoltKeyValueScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);     
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("line-reader-spout", kafkaSpout);
    builder.setBolt("word-spitter", new WordSpitterBolt()).shuffleGrouping("line-reader-spout");
    builder.setBolt("word-counter", new WordCounterBolt()).shuffleGrouping("word-spitter");
    builder.setBolt("forwardToKafka", bolt,1).shuffleGrouping("word-counter");
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("HelloStorm", config, builder.createTopology());

如您所见,我想将 wordcounter bolt 的结果发送到 kafka 主题。但是,我得到了这个例外:

java.lang.IllegalArgumentException: message does not exist
at org.apache.storm.tuple.Fields.fieldIndex(Fields.java:95) ~[storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:100) ~[storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:149) ~[storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.getMessageFromTuple(FieldNameBasedTupleToKafkaMapper.java:46) ~[storm-kafka-1.0.5.jar:1.0.5]
at org.apache.storm.kafka.bolt.KafkaBolt.process(KafkaBolt.java:120) [storm-kafka-1.0.5.jar:1.0.5]
at org.apache.storm.topology.base.BaseTickTupleAwareRichBolt.execute(BaseTickTupleAwareRichBolt.java:38) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.daemon.executor$fn__4963$tuple_action_fn__4965.invoke(executor.clj:731) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__4884.invoke(executor.clj:461) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.disruptor$clojure_handler$reify__4398.onEvent(disruptor.clj:40) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:453) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:432) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.daemon.executor$fn__4963$fn__4976$fn__5029.invoke(executor.clj:850) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.0.5.jar:1.0.5]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]

7793 [Thread-18-forwardToKafka-executor[2 2]] 信息 oasdexecutor - BOLT 失败任务:2 时间:-1 元组:源:字计数器:4,流:默认,id:{},[ “发布”}]

在字数统计中,我做到了:

String str = input.getString(0);
collector.emit(new Values(str));
    collector.ack(input);

【问题讨论】:

    标签: java apache-kafka apache-storm


    【解决方案1】:

    尝试在螺栓的public void declareOutputFields(OutputFieldsDeclarer declarer) 函数中添加declarer.declare(new Fields("message")) 喜欢的是:

    public class MessageBolt extends BaseBasicBolt {
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String word = (String) input.getValue(0);
            String out = word ;
            collector.emit(new Values(out));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("message"));
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-01-10
      • 1970-01-01
      • 2013-10-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多