【问题标题】:Filtering data bolt Storm过滤数据螺栓 Storm
【发布时间】:2020-01-18 08:10:14
【问题描述】:

我有一个简单的 Storm 拓扑,它从 Kafka 读取数据,解析和提取消息字段。我想通过其中一个字段值过滤元组流并在另一个字段值上执行计数聚合。我怎样才能在 Storm 中做到这一点? 我还没有找到元组(过滤器、聚合)的相应方法,所以我应该直接在字段值上执行这些函数吗?

这是一个拓扑:

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
topologyBuilder.setBolt("parser_bolt", new ParserBolt()).shuffleGrouping("kafka_spout")
topologyBuilder.setBolt("transformer_bolt", new KafkaTwitterBolt()).shuffleGrouping("parser_bolt")

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

我已经设置了 KafkaTwitterBolt 来使用解析的字段进行计数和过滤。我设法过滤了整个值列表,而不是按特定字段:

class KafkaTwitterBolt() extends BaseBasicBolt{

 override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
  val tweetValues = input.getValues.asScala.toList
  val filterTweets = tweetValues
     .map(_.toString)
     .filter(_ contains "big data")
  val resultAllValues = new Values(filterTweets)
  collector.emit(resultAllValues)
 }

 override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
  declarer.declare(new Fields("created_at", "id", "text", "source", "timestamp_ms",
   "user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
   "user.friends_count", "user.lang", "user.favorite_count", "entities.hashtags"))
 }
}

【问题讨论】:

    标签: filter apache-storm topology


    【解决方案1】:

    事实证明,Storm 核心 API 不允许这样做,以便对任何应该使用 Trident 的字段执行过滤(它具有内置的过滤功能)。 代码如下所示:

     val tridentTopology = new TridentTopology()
    
        val stream = tridentTopology.newStream("kafka_spout",
          new KafkaTridentSpoutOpaque(spoutConfig))
          .map(new ParserMapFunction, new Fields("created_at", "id", "text", "source", "timestamp_ms",
            "user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
            "user.friends_count", "user.favorite_count", "user.lang", "entities.hashtags"))
        .filter(new LanguageFilter)
    

    过滤函数本身:

    class LanguageFilter extends BaseFilter{
    
      override def isKeep(tuple: TridentTuple): Boolean = {
        val language = tuple.getStringByField("user.lang")
        println(s"TWEET: $language")
        language.contains("en")
      }
    }
    

    【讨论】:

      【解决方案2】:

      您在https://stackoverflow.com/a/59805582/8845188 的回答有点错误。 Storm 核心 API 确实允许过滤和聚合,您只需要自己编写逻辑即可。

      过滤螺栓只是丢弃一些元组并传递其他元组的螺栓。例如,以下螺栓将根据字符串字段过滤掉元组:

      class FilteringBolt() extends BaseBasicBolt{
      
       override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
        val values = input.getValues.asScala.toList
        if ("Pass me".equals(values.get(0))) {
          collector.emit(values)
        }
        //Emitting nothing means discarding the tuple
       }
      
       override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
        declarer.declare(new Fields("some-field"))
       }
      }
      

      聚合bolt只是一个收集多个元组的bolt,然后发出一个锚定在原始元组中的新聚合元组:

      class AggregatingBolt extends BaseRichBolt {
        List<Tuple> tuplesToAggregate = ...;
        int counter = 0;
      
       override def execute(input: Tuple): Unit = {
        tuplesToAggregate.add(input);
        counter++;
        if (counter == 10) {
          Values aggregateTuple = ... //create a new set of values based on tuplesToAggregate
          collector.emit(tuplesToAggregate, aggregateTuple) //This anchors the new aggregate tuple to all the original tuples, so if the aggregate fails, the original tuples are replayed.
          for (Tuple t : tuplesToAggregate) {
            collector.ack(t); //Ack the original tuples now that this bolt is done with them
            //Note that you MUST emit before you ack, or the at-least-once guarantee will be broken.
          }
          tuplesToAggregate.clear();
          counter = 0;
        }
        //Note that we don't ack the input tuples until the aggregate gets emitted. This lets us replay all the aggregated tuples in case the aggregate fails
       }
      }
      

      请注意,对于聚合,您需要扩展 BaseRichBolt 并手动执行确认,因为您希望延迟确认元组,直到它包含在聚合元组中。

      【讨论】:

        猜你喜欢
        • 2014-07-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多