【发布时间】:2015-11-27 02:01:56
【问题描述】:
我一直在向 Kafka 主题发送消息,这些消息在主题中采用 JSON 格式,我使用 KafkaSpout 从 Kafka 获取消息并使用随机分组将其发送到螺栓。现在我想实现KafkaSpout 和bolt 之间的字段分组。请任何人都可以帮助我。如何实现KafkaSpout 和螺栓之间的字段分组。
【问题讨论】:
我一直在向 Kafka 主题发送消息,这些消息在主题中采用 JSON 格式,我使用 KafkaSpout 从 Kafka 获取消息并使用随机分组将其发送到螺栓。现在我想实现KafkaSpout 和bolt 之间的字段分组。请任何人都可以帮助我。如何实现KafkaSpout 和螺栓之间的字段分组。
【问题讨论】:
你需要实现backtype.storm.spout.scheme接口,基本上看起来是这样的:
public class FooScheme implements Scheme {
public Values deserialize(final byte[] _line) {
try{
Values values = new Values();
JSONObject msg = (JSONObject) JSONValue.parseWithException(new String(_line));
values.add((String) msg.get("a"));
values.add((String) msg.get("b"))
values.add(msg)
}
catch(ParseException e) {
//handle the exception
return null;
}
}
public Fields getOutputFields() {
return new Fields("a", "b", "json");}
}
然后你可以像这样在你的喷口上使用它:
SpoutConfig spoutConfig = new SpoutConfig( ... your config here ...);
spoutConfig.scheme = new SchemeAsMultiScheme(new FooScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
topology.setSpout("kafka-spout", 1).setNumTasks(1);
现在您可以准备使用按“a”或“b”或两者分组的字段了。
FooBolt bolt = new FooBolt();
topology.setBolt("foo-bolt", new FooBolt(), 1).setNumtasks(1)
.fieldsGrouping("kafka-spout", new Fields("a","b"));
享受
【讨论】: