【问题标题】:how can I achieve fields grouping between KafkaSpout and the bolts如何实现 KafkaSpout 和螺栓之间的字段分组
【发布时间】:2015-11-27 02:01:56
【问题描述】:

我一直在向 Kafka 主题发送消息,这些消息在主题中采用 JSON 格式,我使用 KafkaSpout 从 Kafka 获取消息并使用随机分组将其发送到螺栓。现在我想实现KafkaSpout 和bolt 之间的字段分组。请任何人都可以帮助我。如何实现KafkaSpout 和螺栓之间的字段分组。

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    你需要实现backtype.storm.spout.scheme接口,基本上看起来是这样的:

    public class FooScheme implements Scheme {
    
      public Values deserialize(final byte[] _line) {
    
         try{
               Values values = new Values();
               JSONObject msg = (JSONObject) JSONValue.parseWithException(new String(_line));
               values.add((String) msg.get("a"));
               values.add((String) msg.get("b"))
               values.add(msg)
            }
            catch(ParseException e) {
                //handle the exception
                return null;
            }
    
      }
    
      public Fields getOutputFields() {
         return new Fields("a", "b", "json");}
    }
    

    然后你可以像这样在你的喷口上使用它:

    SpoutConfig spoutConfig = new SpoutConfig( ... your config here ...);
    spoutConfig.scheme = new SchemeAsMultiScheme(new FooScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    topology.setSpout("kafka-spout", 1).setNumTasks(1);
    

    现在您可以准备使用按“a”或“b”或两者分组的字段了。

    FooBolt bolt = new FooBolt();
    topology.setBolt("foo-bolt", new FooBolt(), 1).setNumtasks(1)
             .fieldsGrouping("kafka-spout", new Fields("a","b"));
    

    享受

    【讨论】:

    • 我曾尝试使用您的代码示例来实现,但我可能会收到以下错误 --- java.lang.IllegalArgumentException: Tuple created with wrong number of fields。预计有 1 个字段,但在 backtype.storm.tuple.TupleImpl.(TupleImpl.java:55) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0. 0-2041]
    • @RishiArora getOutputFields 中的字段数是否与反序列化中的值数相同?
    • public Values deserialize(final byte[] _line) { // TODO 自动生成的方法存根 try { Values values = new Values(); JSONObject msg = (JSONObject) JSONValue .parseWithException(new String(_line)); //values.add((String) msg​​.get("id")); values.add(msg);返回值;
    • @Override public Fields getOutputFields() { // TODO 自动生成的方法存根 return new Fields("id"); }
    • @RishiArora 更改 getOutputFields() 以返回 new Fields("id", "json");您将返回两个字段,id 和消息本身。
    猜你喜欢
    • 1970-01-01
    • 2019-07-26
    • 2014-07-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多