【发布时间】:2019-07-26 12:43:33
【问题描述】:
我是 Apache Storm 的新手。
我正在尝试使用 Apache Kafka、Storm 和 ESPER CEP 引擎开发一个实时流处理系统。
为此,我有一个 KafkaSpout,它将向 Bolts(具有我的 CEP 查询)发出流以过滤流。
我已经创建了一个拓扑,我正在尝试在本地集群上运行它
问题是在我的 bolts 中运行的 CEP 查询需要成批的元组来对流执行窗口操作。在我的拓扑中,KafkaSpout 一次只向 Bolts 发送一个元组进行处理。所以我的 CEP 查询没有按预期工作。
我在 Storm 中使用默认的 KafkaSpout。有什么办法可以一次将多个不同的元组发送到螺栓?一些配置调整可以做到这一点,还是我需要为此制作我的自定义 KafkaSpout?
请帮忙!!
我的拓扑:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("KafkaSpout", new KafkaSpout(KafkaSpoutConfig.builder("localhost:" + 9092, "weatherdata").setProp(ConsumerConfig.GROUP_ID_CONFIG, "weather-consumer-group").build() ),4);
builder.setBolt("A", new FeatureSelectionBolt(), 2).globalGrouping("KafkaSpout");
builder.setBolt("B", new TrendDetectionBolt(), 2).shuffleGrouping("A")
我正在使用 2 个螺栓和一个喷口。
我在 Bolt A 中运行的 esper 查询是
select first(e), last(e) from weatherEvent.win:length(3) as e
在这里,我试图从事件流中获取长度为 3 的窗口中的第一个和最后一个事件。但是我得到相同的第一个和最后一个事件,因为 KafkaSpout 一次只发送一个元组。
【问题讨论】:
标签: apache-kafka apache-storm esper bolt spout