【问题标题】:How to make a SQS delay queue of x hours如何制作 x 小时的 SQS 延迟队列
【发布时间】:2020-07-30 10:32:12
【问题描述】:

我需要一个队列来处理延迟 x 小时后的消息。而且我需要一种数据驱动的所有基于事件的方法,不使用任何调度程序等。

场景是我将一些实时数据发送到 SNS 主题,然后从那里发送到不同的 SQS 队列,以供不同的 AWS Lambda 函数使用。

其中一个 Lambda 函数需要在延迟 3 小时后处理消息。但是,最长交货延迟为 15 分钟。如果我第一次阅读该消息,它将自动从 SQS 中删除,因为我正在使用事件源映射触发器来调用 lambda 函数。

所以,我想知道如何避免删除消息并使其在第一次处理时不可见?

任何想法/帮助将不胜感激。

【问题讨论】:

  • 第一次阅读后不能删除邮件吗?也许您可以从队列中获取它,将其可见性更改为 3 小时(最长 12 小时)并在 3 小时内再次正确处理它。
  • 我一读到它们就会被事件源映射自动删除!
  • @Marcin ,我实际上有这个计划阅读它一次并将它留在那里直到可见性超时达到再次重新处理的阈值。但是,它似乎第一次被删除。我正在寻找一种方法来避免这个删除部分。
  • 如果您从 lambda 返回成功,是的,事件源映射将假定消息已成功处理并将其删除。如果您在更改可见性超时后在 lambda 中失败,事件源映射将不会删除该消息。相反,它将重试再次提交。我认为返回 200 以外的状态码会被视为失败?必须检查一下。
  • 哦!很高兴知道我认为只有调用失败才会将 msgs 再次返回到队列中......所以我的 lambda 中的任何异常都会对吗?我马上试试。。谢谢

标签: amazon-web-services aws-lambda amazon-sqs data-driven-tests


【解决方案1】:

Amazon SQS 不会执行您的请求。另外,我不建议使用任何“技巧”来强制延迟。

我建议您查看 AWS Step Functions。它可以协调 AWS Lambda 函数之间的交互,并且可以配置为在调用 AWS Lambda 函数之前等待(睡眠)一段时间

【讨论】:

    【解决方案2】:

    我测试过了。这似乎是可行的。我使用下面的代码进行测试。但是,这似乎不是做您想要实现的目标的“良好实践”方式。我看到两个主要问题:

    1. 机上按摩次数上限为 120,000。因此,您几乎可以无限扩展您的 SQS 队列。

    2. 您的指标将充满错误调用,并且很难区分真正失败的调用和故意失败的调用。

    因此,我会研究其他解决方案。

    import json
    import os
    import time
    
    import boto3
    
    sqs = boto3.client('sqs')
    
    queue_url = os.environ['QUEUE_URL']
    
    new_visibility_timeout = 120 # seconds
    
    def lambda_handler(event, context):
    
        print(json.dumps(event))
    
        current_time = time.time()
    
        no_of_new_records = 0
    
        for record in event['Records']:
    
            msg_timestamp = float(record['attributes']['SentTimestamp'])/1000
    
            msg_age = current_time - msg_timestamp
    
            print(f"Message age: {msg_age} seconds")
    
            if msg_age > new_visibility_timeout:
    
                print("Message to be successfully processed and deleted from queue")
    
                response = sqs.delete_message(
                    QueueUrl=queue_url,
                    ReceiptHandle=record['receiptHandle']
                )
    
                print(response)
    
            else:
    
                print("Set long visibility timeout")
    
                response = sqs.change_message_visibility(
                    QueueUrl=queue_url,
                    ReceiptHandle=record['receiptHandle'],
                    VisibilityTimeout=new_visibility_timeout
                )
    
                print(response)
    
                no_of_new_records += 1
    
        if no_of_new_records > 0:
            raise Exception("Fail the lambda")
    
        return {'statusCode': 200}
    

    【讨论】:

    • 我试过这个,但对我不起作用!即使我提出了异常,消息仍然被删除..我可以看到它甚至改变了可见性超时,但这不像将该消息标记为不可见。它只是删除它:|
    • 您的 lambda 是否有权使用 SQS?还可以为主 SQS 队列设置 DLQ,这样如果处理不正确,消息就会去那里。
    • 是的,我也可以手动调用删除消息!但没有意义!因为无论如何它都会删除味精..我认为这是从 sqs 到 lambda 的触发器,这使得这种自动删除行为......如果我手动轮询队列,它将正常工作
    • 你还在使用延迟队列吗?我使用所有默认设置在常规队列中对其进行了测试。 lambda 的执行角色也有完整的 sqs 权限。也许如果您使用不同的设置,您需要相应地调整代码。作为概念证明,您可以使用提供的代码控制可见性超时。
    猜你喜欢
    • 2018-08-16
    • 2018-04-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-04
    • 1970-01-01
    • 2017-12-02
    • 2019-04-10
    相关资源
    最近更新 更多