【问题标题】:How to send multiple (different) tuples from one KafkaSpout at once to the bolt?如何一次将多个(不同)元组从一个 KafkaSpout 发送到螺栓?
【发布时间】:2019-07-26 12:43:33
【问题描述】:

我是 Apache Storm 的新手。

我正在尝试使用 Apache Kafka、Storm 和 ESPER CEP 引擎开发一个实时流处理系统。

为此,我有一个 KafkaSpout,它将向 Bolts(具有我的 CEP 查询)发出流以过滤流。

我已经创建了一个拓扑,我正在尝试在本地集群上运行它

问题是在我的 bolts 中运行的 CEP 查询需要成批的元组来对流执行窗口操作。在我的拓扑中,KafkaSpout 一次只向 Bolts 发送一个元组进行处理。所以我的 CEP 查询没有按预期工作。

我在 Storm 中使用默认的 KafkaSpout。有什么办法可以一次将多个不同的元组发送到螺栓?一些配置调整可以做到这一点,还是我需要为此制作我的自定义 KafkaSpout?

请帮忙!!

我的拓扑:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("KafkaSpout", new KafkaSpout(KafkaSpoutConfig.builder("localhost:" + 9092, "weatherdata").setProp(ConsumerConfig.GROUP_ID_CONFIG, "weather-consumer-group").build() ),4);

builder.setBolt("A", new FeatureSelectionBolt(), 2).globalGrouping("KafkaSpout");

builder.setBolt("B", new TrendDetectionBolt(), 2).shuffleGrouping("A")

我正在使用 2 个螺栓和一个喷口。

我在 Bolt A 中运行的 esper 查询是

select first(e), last(e) from weatherEvent.win:length(3) as e

在这里,我试图从事件流中获取长度为 3 的窗口中的第一个和最后一个事件。但是我得到相同的第一个和最后一个事件,因为 KafkaSpout 一次只发送一个元组。

【问题讨论】:

    标签: apache-kafka apache-storm esper bolt spout


    【解决方案1】:

    spout 无法做到这一点,但您可以使用 Storm 的窗口支持 https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html,或者只是编写一个聚合 Bolt 并将其放在 spout 和拓扑的其余部分之间。

    所以你的拓扑应该是spout -> aggregator -> feature selection -> trend detection

    我建议您尝试内置的窗口支持,但如果您更愿意编写自己的聚合,您的 Bolt 真的只需要接收一些元组(例如 3 个),并发出一个包含所有值。

    聚合器螺栓应该做类似的事情

    private List<Tuple> buffered;
    
    execute(Tuple input) {
      if (buffered.size != 2) {
        buffered.add(input)
        return
      }
      Tuple first = buffered.get(0)
      Tuple second = buffered.get(1)
      Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
      List<Tuple> anchors = List.of(first, second, input)
      collector.emit(anchors, aggregate)
      collector.ack(first, second, input)
      buffered.clear()
    }
    

    这样你最终会得到一个包含 3 个输入元组内容的元组。

    【讨论】:

    • 我正在指定窗口长度和滑动窗口的长度,例如:builder.setBolt("FeatureSelectionBolt", new FeatureSelectionBolt().withWindow(new BaseWindowedBolt.Count(3), new BaseWindowedBolt.Count(5) )), 2) 但我仍然收到异常 java.lang.IllegalArgumentException: Window length is not specified org.apache.storm.topology.WindowedBoltExecutor.validate(WindowedBoltExecutor.java:126) ~[storm-core-1.2.1. jar:1.2.1] org.apache.storm.topology.WindowedBoltExecutor.initWindowManager(WindowedBoltExecutor.java:200) ~[storm-core-1.2.1.jar:1.2.1] 这里有什么问题?
    • 不确定,我觉得还不错。我假设 FeatureSelectionBolt 扩展了 BaseWindowedBolt?而且你没有用任何你不调用 .withWindow on 的螺栓来扩展 BaseWindowedBolt?
    • 是的,FeatureSelectionBolt 扩展了 BaseWindowedBolt。此外,我没有任何不扩展 BaseWindowedBolt 并且仍然调用 .withWindow() 的螺栓。所以不确定是什么导致了问题。仅供参考,我还有两个螺栓(扩展 BaseRichBolt 而不是 BaseWindowedBolt),它们从 FeatureSelection Bolt 获取元组。另外,我正在将我的拓扑提交到本地集群。我希望这不会造成问题
    • 我不确定问题出在哪里,但如果您使用本地集群运行,则可以很容易地对其进行调试。异常来自github.com/apache/storm/blob/v1.2.1/storm-core/src/jvm/org/…,因为 windowLengthCount 为空。仅当 github.com/apache/storm/blob/v1.2.1/storm-core/src/jvm/org/… 为假时才会发生这种情况。
    • 当您在此处调用 .withWindow github.com/apache/storm/blob/v1.2.1/storm-core/src/jvm/org/… 时,配置会添加到stormConf 映射中。尝试调试以查看是否可以发现该配置不在stormConf 中的原因。我会看看我是否可以在 LocalCluster 中运行我们的示例窗口拓扑。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-07-29
    • 1970-01-01
    • 2019-08-29
    • 1970-01-01
    • 2016-04-19
    • 1970-01-01
    相关资源
    最近更新 更多