【问题标题】:How to use storm Trident for batching tuples?如何使用 Storm Trident 对元组进行批处理?
【发布时间】:2016-06-22 04:31:37
【问题描述】:

我之前使用过storm,我需要更多的批处理功能,所以我在storm中搜索了批处理。 我发现了实时进行微批处理的 Trident。

但不知何故,我无法弄清楚 Trident 如何处理微批处理(流量、批处理大小、批处理间隔)以知道它确实有我需要的东西。

我想做的是收集/保存喷口在一段时间内发出的元组,并在另一个时间间隔内将它们重新发送到下游组件/螺栓/功能。 (例如,spout 每秒发出一个元组,下一个 trident 函数将收集/保存元组并每分钟发出 50 个元组到下一个函数。)

有人可以指导我在这种情况下如何应用 Trident 吗? 或者任何其他使用风暴功能的适用方式?

【问题讨论】:

    标签: tuples apache-storm trident batching


    【解决方案1】:

    很好的问题!但遗憾的是,Trident 不支持这种微批处理。

    但您可以尝试实现自己的频率驱动微批处理。像这样的骨架示例:

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.LinkedBlockingQueue;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MicroBatchingBolt extends BaseRichBolt {
    
        private static final long serialVersionUID = 8500984730263268589L;
        private static final Logger LOG = LoggerFactory.getLogger(MicroBatchingBolt.class);
    
        protected LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<Tuple>();
    
        /** The threshold after which the batch should be flushed out. */
        int batchSize = 100;
    
        /**
         * The batch interval in sec. Minimum time between flushes if the batch sizes
         * are not met. This should typically be equal to
         * topology.tick.tuple.freq.secs and half of topology.message.timeout.secs
         */
        int batchIntervalInSec = 45;
    
        /** The last batch process time seconds. Used for tracking purpose */
        long lastBatchProcessTimeSeconds = 0;
    
        private OutputCollector collector;
    
        @Override
        @SuppressWarnings("rawtypes")
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void execute(Tuple tuple) {
          // Check if the tuple is of type Tick Tuple
          if (isTickTuple(tuple)) {
             // If so, it is indication for batch flush. But don't flush if previous
             // flush was done very recently (either due to batch size threshold was
             // crossed or because of another tick tuple
    
            if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
              LOG.debug("Current queue size is " + this.queue.size()
                  + ". But received tick tuple so executing the batch");
    
              finishBatch();
            } else {
              LOG.debug("Current queue size is " + this.queue.size()
                  + ". Received tick tuple but last batch was executed "
                  + (System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds)
                  + " seconds back that is less than " + batchIntervalInSec
                  + " so ignoring the tick tuple");
            }
          } else {
            // Add the tuple to queue. But don't ack it yet.
            this.queue.add(tuple);
            int queueSize = this.queue.size();
            LOG.debug("current queue size is " + queueSize);
            if (queueSize >= batchSize) {
              LOG.debug("Current queue size is >= " + batchSize
                  + " executing the batch");
    
              finishBatch();
            }
          }
        }
    
        private boolean isTickTuple(Tuple tuple) {
            // Check if it is tick tuple here
            return false;
        }
    
        /**
         * Finish batch.
         */
        public void finishBatch() {
    
          LOG.debug("Finishing batch of size " + queue.size());
          lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
          List<Tuple> tuples = new ArrayList<Tuple>();
          queue.drainTo(tuples);
    
          for (Tuple tuple : tuples) {
            // Prepare your batch here (may it be JDBC, HBase, ElasticSearch, Solr or
            // anything else.
            // List<Response> responses = externalApi.get("...");
          }
    
          try {
            // Execute your batch here and ack or fail the tuples
            LOG.debug("Executed the batch. Processing responses.");
            //        for (int counter = 0; counter < responses.length; counter++) {
            //          if (response.isFailed()) {
            //            LOG.error("Failed to process tuple # " + counter);
            //            this.collector.fail(tuples.get(counter));
            //          } else {
            //            LOG.debug("Successfully processed tuple # " + counter);
            //            this.collector.ack(tuples.get(counter));
            //          }
            //        }
          } catch (Exception e) {
            LOG.error("Unable to process " + tuples.size() + " tuples", e);
            // Fail entire batch
            for (Tuple tuple : tuples) {
              this.collector.fail(tuple);
            }
          }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // ... 
        }
    
    }
    

    来源:http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/Using tick tuples with trident in storm

    【讨论】:

    • 我们如何确保同一个队列不会被两个不同的线程处理。方法 finishBatch 不应由同一队列的两个不同执行程序处理。对此有什么想法吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-03-19
    • 1970-01-01
    • 2013-03-09
    • 2015-12-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多