【发布时间】:2020-01-18 08:10:14
【问题描述】:
我有一个简单的 Storm 拓扑,它从 Kafka 读取数据,解析和提取消息字段。我想通过其中一个字段值过滤元组流并在另一个字段值上执行计数聚合。我怎样才能在 Storm 中做到这一点? 我还没有找到元组(过滤器、聚合)的相应方法,所以我应该直接在字段值上执行这些函数吗?
这是一个拓扑:
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
topologyBuilder.setBolt("parser_bolt", new ParserBolt()).shuffleGrouping("kafka_spout")
topologyBuilder.setBolt("transformer_bolt", new KafkaTwitterBolt()).shuffleGrouping("parser_bolt")
val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
我已经设置了 KafkaTwitterBolt 来使用解析的字段进行计数和过滤。我设法过滤了整个值列表,而不是按特定字段:
class KafkaTwitterBolt() extends BaseBasicBolt{
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val tweetValues = input.getValues.asScala.toList
val filterTweets = tweetValues
.map(_.toString)
.filter(_ contains "big data")
val resultAllValues = new Values(filterTweets)
collector.emit(resultAllValues)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("created_at", "id", "text", "source", "timestamp_ms",
"user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
"user.friends_count", "user.lang", "user.favorite_count", "entities.hashtags"))
}
}
【问题讨论】:
标签: filter apache-storm topology