【问题标题】:send output of two bolts to a single bolt in Storm?将两个螺栓的输出发送到 Storm 中的单个螺栓?
【发布时间】:2014-05-29 21:46:51
【问题描述】:

将 BoltA 和 BoltB 的输出发送到 BoltC 的最简单方法是什么?我必须使用联接还是有任何更简单的解决方案。 A 和 B 具有相同的字段(ts、metric_name、metric_count)。

    // KafkaSpout --> LogDecoder
    builder.setBolt(LOGDECODER_BOLT_ID, logdecoderBolt, 10).shuffleGrouping(KAFKA_SPOUT_ID);

    // LogDecoder --> CountBolt
    builder.setBolt(COUNT_BOLT_ID, countBolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);

    // LogDecoder --> HttpResCodeCountBolt
    builder.setBolt(HTTP_RES_CODE_COUNT_BOLT_ID, http_res_code_count_bolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);


    # And now I want to send CountBolt and HttpResCodeCountBolt output to Aggregator Bolt.

    // CountBolt --> AggregatwBolt
    builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((COUNT_BOLT_ID), new Fields("ts"));

    // HttpResCodeCountBolt --> AggregatwBolt
    builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), new Fields("ts"));

这可能吗?

【问题讨论】:

    标签: apache-storm


    【解决方案1】:

    是的。只需在 fieldsGrouping 调用中添加一个流 ID(下面的“stream1”和“stream2”):

    BoltDeclarer bd = builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5); 
    bd.fieldsGrouping((COUNT_BOLT_ID), "stream1",  new Fields("ts"));
    bd.fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), "stream2", new Fields("ts"));
    

    然后在 BoltC 的 execute() 方法中,您可以测试以查看元组来自哪个流:

    public void execute(Tuple tuple) {
    
        if ("stream1".equals(tuple.getSourceStreamId())) {
            // this came from stream1
        } else if ("stream2".equals(tuple.getSourceStreamId())) {
            // this came from stream2
        }
    

    由于您知道元组来自哪个流,因此您不需要在两个流上具有相同形状的元组。您只需根据流 id 对元组进行解组。

    您还可以检查元组来自哪个组件(当我键入此内容时,我认为这可能更适合您的情况)以及发出元组的组件(任务)的实例。

    【讨论】:

    • 如果你像上面的代码一样改变你的代码,它会在线程“main”java.lang.IllegalArgumentException中抛出类似异常的错误:Bolt has been declared for id
    • @Mouli 我遇到了同样的错误。你找到解决办法了吗?
    • @Inderpal 编辑了我的答案,为螺栓创建了一个 BoltDeclarer,然后使用该引用来定义两个输入流。不确定此代码何时停止工作。
    【解决方案2】:

    正如@Chris 所说,您可以使用流。但是您也可以简单地从元组中获取源组件。

    @Override
    public final void execute(final Tuple tuple) {
        final String sourceComponent = tuple.getSourceComponent();
        ....
    }
    

    源组件是您在拓扑初始化时为 Bolt 指定的名称。例如:COUNT_BOLT_ID。

    【讨论】:

      猜你喜欢
      • 2014-07-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-29
      相关资源
      最近更新 更多