【问题标题】:Best practice for polling an AWS SQS queue and deleting received messages from queue?轮询 AWS SQS 队列并从队列中删除收到的消息的最佳实践?
【发布时间】:2015-09-28 02:16:27
【问题描述】:
我有一个 SQS 队列,它不断被数据消费者填充,我现在正在尝试创建一个服务,该服务将使用 Python 的 boto 从 SQS 中提取这些数据。
我的设计方式是让 10-20 个线程都尝试从 SQS 队列中读取消息,然后执行它们必须对数据执行的操作(业务逻辑),然后再返回队列获取完成后的下一批数据。如果没有数据,他们只会等到一些数据可用。
我对这个设计有两个不确定的地方
- 是否需要使用较长的 time_out 值调用 receive_message(),如果在 20 秒(允许的最大值)内没有返回任何内容,然后重试?或者是否有一个阻塞方法只在数据可用时返回?
- 我注意到一旦我收到一条消息,它并没有从队列中删除,我是否必须接收一条消息,然后在收到消息后发送另一个请求才能将它从队列中删除?似乎有点矫枉过正。
谢谢
【问题讨论】:
标签:
python
amazon-web-services
boto
amazon-sqs
【解决方案1】:
receive_message() 方法的长轮询功能是轮询 SQS 的最有效方式。如果返回时没有任何消息,我建议在重试之前稍作延迟,尤其是在您有多个阅读器的情况下。您甚至可能想要进行增量延迟,以便每次后续的空读取等待更长的时间,这样您就不会最终受到 AWS 的限制。
是的,您必须在阅读后删除该消息,否则它将重新出现在队列中。这在工作人员读取消息然后在完全处理消息之前失败的情况下实际上非常有用。在这种情况下,它将被重新排队并由另一个工作人员读取。您还需要确保将消息的不可见超时设置为足够长,以便工作人员有足够的时间在消息自动重新出现在队列中之前对其进行处理。如有必要,如果超时时间超过预期,您的工作人员可以在处理时调整超时。
【解决方案2】:
另一种选择是使用 AWS Beanstalk 设置工作程序应用程序,如 this blogpost 中所述。
您的烧瓶应用程序不是使用 boto3 进行长轮询,而是在 HTTP 帖子中将消息作为 json 对象接收。正在设置的 HTTP 路径和消息类型可在 AWS Elastic Beanstalk 配置选项卡中进行配置:
AWS Elastic Beanstalk 的另一个好处是能够根据 SQS 队列的大小动态扩展工作人员的数量,以及它的部署管理优势。
This 是一个示例应用程序,我发现它可用作模板。
【解决方案3】:
如果您想要一种简单的方法来设置侦听器,包括在消息处理完成后自动删除消息,并将异常自动推送到指定队列,您可以使用pySqsListener 包。
你可以像这样设置一个监听器:
from sqs_listener import SqsListener
class MyListener(SqsListener):
def handle_message(self, body, attributes, messages_attributes):
run_my_function(body['param1'], body['param2']
listener = MyListener('my-message-queue', 'my-error-queue')
listener.listen()
有一个标志可以从短轮询切换到长轮询 - 它都记录在 README 文件中。
免责声明:我是该软件包的作者。