【问题标题】:Storm SQS messages not getting ackedStorm SQS 消息未得到确认
【发布时间】:2015-12-15 01:17:58
【问题描述】:

我有一个拓扑结构,其中 1 个喷口从 2 个 SQS 队列和 5 个螺栓读取。处理后,当我尝试从第二个螺栓确认时,它没有得到确认。

我正在以可靠模式运行它并尝试确认最后一个螺栓。我收到这条消息,就好像消息正在被确认一样。但它没有从队列中删除,并且覆盖的ack() 方法没有被调用。看起来它调用了backtype.storm.task.OutputCollector 中的默认 ack 方法,而不是我的 spout 中的重写方法。

8240 [Thread-24-conversionBolt] INFO  backtype.storm.daemon.task - Emitting: conversionBolt__ack_ack [-7578372739434961741 -8189877254603774958]

我已将消息 ID 锚定到我的 SQS 队列喷口中的元组并发送到第一个螺栓。

collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());

我的队列中覆盖了 ack() 和 fail() 方法 spout.Default Visibility Timeout 已设置为 30 秒

我的拓扑中的代码 sn-p:

TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("firstQueueSpout",
                new SqsQueueSpout(StormConfigurations.getQueueURL()
                        + StormConfigurations.getFirstQueueName(), true),
                StormConfigurations.getAwsQueueSpoutThreads());

        builder.setSpout("secondQueueSpout",
                new SqsQueueSpout(StormConfigurations.getQueueURL()
                        + StormConfigurations.getSecondQueueName(),
                        true), StormConfigurations.getAwsQueueSpoutThreads());

        builder.setBolt("transformerBolt", new TransformerBolt(),
                StormConfigurations.getTranformerBoltThreads())
                .shuffleGrouping("firstQueueSpout")
                .shuffleGrouping("secondQueueSpout");

        builder.setBolt("conversionBolt", new ConversionBolt(),
                StormConfigurations.getTranformerBoltThreads())
                .shuffleGrouping("transformerBolt");

        // To dispatch it to the corresponding bolts based on packet type
        builder.setBolt("dispatchBolt", new DispatcherBolt(),
                StormConfigurations.getDispatcherBoltThreads())
                .shuffleGrouping("conversionBolt");

来自 SQSQueueSpout 的代码 sn-p(扩展 BaseRichSpout):

@Override
public void nextTuple() 
{
        if (queue.isEmpty()) {
            ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10));
            queue.addAll(receiveMessageResult.getMessages());
        }       
        Message message = queue.poll();
        if (message != null) 
        {
            try
            {
                JSONParser parser = new JSONParser();           
                JSONObject jsonObj = (JSONObject) parser.parse(message.getBody());
                //      ack(message.getReceiptHandle());
                if (reliable) {
                    collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());
                } else {
                    // Delete it right away
                    sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()));             
                    collector.emit(getStreamId(message), new Values(jsonObj.toString()));
                }
            }
            catch (ParseException e) 
            {
                LOG.error("SqsQueueSpout SQLException in SqsQueueSpout.nextTuple(): ", e);
            }
        } else {
            // Still empty, go to sleep.
            Utils.sleep(sleepTime);
        }
    }

    public String getStreamId(Message message) {
        return Utils.DEFAULT_STREAM_ID;
    }

    public int getSleepTime() {
        return sleepTime;
    }

    public void setSleepTime(int sleepTime) 
    {
        this.sleepTime = sleepTime;
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("......Inside ack in sqsQueueSpout..............."+msgId);
        // Only called in reliable mode.
        try {
            sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, (String) msgId));
        } catch (AmazonClientException ace) { }
    }

    @Override
    public void fail(Object msgId) {
        // Only called in reliable mode.
        try {
            sqs.changeMessageVisibilityAsync(
                    new ChangeMessageVisibilityRequest(queueUrl, (String) msgId, 0));
        } catch (AmazonClientException ace) { }
    }

    @Override
    public void close() {
        sqs.shutdown();
        ((AmazonSQSAsyncClient) sqs).getExecutorService().shutdownNow();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }

从我的第一个 Bolt 中截取的代码(扩展 BaseRichBolt):

public class TransformerBolt extends BaseRichBolt 
{
    private static final long serialVersionUID = 1L;
    public static final Logger LOG = LoggerFactory.getLogger(TransformerBolt.class);
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String eventStr = input.getString(0);
//some code here to convert the json string to map
//Map datamap, long packetId being sent to next bolt
    this.collector.emit(input, new Values(dataMap,packetId));       
        } 
        catch (Exception e) {
            LOG.warn("Exception while converting AWS SQS to HashMap :{}", e);
        }    
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("dataMap", "packetId"));
    }
}

第二个 Bolt 的代码 sn-p:

public class ConversionBolt extends BaseRichBolt 
{
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) 
    {
        try{
            Map dataMap = (Map)input.getValue(0);
            Long packetId = (Long)input.getValue(1);

                //this ack is not working
                this.collector.ack(input);
        }catch(Exception e){
            this.collector.fail(input);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

如果您需要更多信息,请告诉我。有人解释了为什么我的 spout 中被覆盖的 ack 没有被调用(来自我的第二个螺栓)......

【问题讨论】:

    标签: apache-storm amazon-sqs


    【解决方案1】:

    您必须ack all 螺栓中的所有传入元组,即将collector.ack(input) 添加到TransformerBolt.execute(Tuple input)

    您看到的日志消息是正确的:您的代码调用collector.ack(...) 并且此调用被记录。在拓扑中调用ack不是调用Spout.ack(...):每次Spout 发出带有消息ID 的元组时,该ID 都会被拓扑中运行的ackers 注册。这些 acker 将在 Bolt 的每个 ack 上收到一条消息,收集这些消息并在收到元组的所有 ack 时通知 Spout。如果 Spout 从 acker 接收到此消息,它会调用它自己的 ack(Object messageID) 方法。

    更多详情请看这里:https://storm.apache.org/documentation/Guaranteeing-message-processing.html

    【讨论】:

    • 谢谢马蒂亚斯。在拧紧每个螺栓后它工作正常。在我确认第一个螺栓的那一刻,消息已从 SQS 的“可用消息”部分中删除,这让我想到如果我在失败时将其取回该怎么办。因此,仅在最后一个螺栓中添加了 ack。我的假设是错误的。它工作完美。再次感谢。
    • 重要的一点是使用输入元组作为中间螺栓(即无沉螺栓)中的锚,然后告诉 Storm 原始输入元组(尚未)完全处理——只有这一步(即 Bolt)完成了处理。
    猜你喜欢
    • 2014-02-11
    • 2018-04-02
    • 1970-01-01
    • 2019-08-31
    • 2017-02-03
    • 2016-04-16
    • 1970-01-01
    • 1970-01-01
    • 2012-01-13
    相关资源
    最近更新 更多