【发布时间】:2018-04-15 22:15:17
【问题描述】:
我正在尝试更改storm中的wordcount示例:我不想显示结果,而是想将其发送到kafka集群。 这是构建拓扑的代码:
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("kafka.broker.config", props);
KafkaBolt bolt = new KafkaBolt()
.withTopicSelector(new DefaultTopicSelector("tt"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper())
.withProducerProperties(props);
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "", "id1");
spoutConfig.scheme = new SchemeAsMultiScheme(new KafkaBoltKeyValueScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("line-reader-spout", kafkaSpout);
builder.setBolt("word-spitter", new WordSpitterBolt()).shuffleGrouping("line-reader-spout");
builder.setBolt("word-counter", new WordCounterBolt()).shuffleGrouping("word-spitter");
builder.setBolt("forwardToKafka", bolt,1).shuffleGrouping("word-counter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("HelloStorm", config, builder.createTopology());
如您所见,我想将 wordcounter bolt 的结果发送到 kafka 主题。但是,我得到了这个例外:
java.lang.IllegalArgumentException: message does not exist
at org.apache.storm.tuple.Fields.fieldIndex(Fields.java:95) ~[storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:100) ~[storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:149) ~[storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.getMessageFromTuple(FieldNameBasedTupleToKafkaMapper.java:46) ~[storm-kafka-1.0.5.jar:1.0.5]
at org.apache.storm.kafka.bolt.KafkaBolt.process(KafkaBolt.java:120) [storm-kafka-1.0.5.jar:1.0.5]
at org.apache.storm.topology.base.BaseTickTupleAwareRichBolt.execute(BaseTickTupleAwareRichBolt.java:38) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.daemon.executor$fn__4963$tuple_action_fn__4965.invoke(executor.clj:731) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__4884.invoke(executor.clj:461) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.disruptor$clojure_handler$reify__4398.onEvent(disruptor.clj:40) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:453) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:432) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.daemon.executor$fn__4963$fn__4976$fn__5029.invoke(executor.clj:850) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.0.5.jar:1.0.5]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
7793 [Thread-18-forwardToKafka-executor[2 2]] 信息 oasdexecutor - BOLT 失败任务:2 时间:-1 元组:源:字计数器:4,流:默认,id:{},[ “发布”}]
在字数统计中,我做到了:
String str = input.getString(0);
collector.emit(new Values(str));
collector.ack(input);
【问题讨论】:
标签: java apache-kafka apache-storm