【问题标题】:AWS Lambda Function invoked by SQS trigger is not respecting the visibility timeout I'm manually setting within the functionSQS 触发器调用的 AWS Lambda 函数不遵守我在函数中手动设置的可见性超时
【发布时间】:2020-02-24 19:22:04
【问题描述】:

我正在实现自己的 webhook 服务,它将向订阅的 webhook 发送事件。

架构概述:

  • 事件被推送到 SQS 队列中
  • 一个 lambda 函数由 SQS 消息触发(事件源映射)
  • 对于每个事件,我都会向订阅的 webhook 发出传出 http 请求
  • 必须使用指数退避重试非 2xx 响应(在这种情况下,我会更改收到消息的消息可见性)
  • 由于 SQS 调用的 lambdas 将在完成后自动删除消息,因此我在函数末尾抛出错误以防止自动删除

据我所知,更改消息可见性的调用正在成功。我想知道 SQS 调用的 lambda 中是否还有其他内容。在 lambda 失败后,它是否会再次在内部更改消息可见性?或者 SQS 调用的 lambdas 是否不尊重消息可见性更改(这对我来说真的没有任何意义)。好奇是否有人对这个问题有任何见解。我很惊讶地发现 lambda 成功后会自动删除消息,因为它让我的特定用例感觉有点笨拙 - 抛出错误以使 lambda 函数失败以防止消息被删除。

提前致谢!

【问题讨论】:

    标签: amazon-web-services aws-lambda amazon-sqs


    【解决方案1】:

    更新:实际上并非如此。我未能正确等待调用来调整超时完成。因此,lambda 在该请求完成之前就关闭了。我在 lambda 内的消息上设置的超时受到尊重。然后我抛出一个错误以防止消息被删除。

    【讨论】:

    • 您应该将此标记为您问题的答案。
    • 我对下面的答案和这个答案有点困惑。是否可以在 lambda 中更改需要更长处理时间的消息的可见性,还是会因为 lambda 独立处理轮询而失败?
    【解决方案2】:

    SQS 与 Lambda 集成的本质是集成控制消息的轮询。用于确定是否应该删除消息的机制是来自 lambda 的响应不是错误。 documentation 中没有明确说明,但我相信当发生错误时,集成会将可见性超时设置为零,以使其立即可供另一个进程获取。因此,在您的示例中,您将其设置为允许您重试的某个数字,但是当您返回错误时,集成会将超时设置回零。如果您需要对该过程进行更多控制,您可能不应该使用集成。

    【讨论】:

    • 我担心是这样的。他们似乎确实在他们的文档中说明了这一点,但它非常微妙。 - “当 Lambda 读取批次时,消息保留在队列中,但在队列的可见性超时长度内隐藏。如果您的函数成功处理该批次,Lambda 会从队列中删除消息。如果您的函数受到限制,则返回一个错误,或者没有响应,消息再次可见。失败的批处理中的所有消息都返回到队列中,因此您的函数代码必须能够多次处理相同的消息而没有副作用。"
    • 它说明了这一点,但没有明确说明它会重置超时。
    • 有这个注释指出了至少一种情况,其中 Lambda 没有在调用之间重置可见性超时。 (source): > 要让您的函数有时间处理每批记录,请将源队列的可见性超时设置为您在函数上配置的超时的至少 6 倍。如果您的函数在处理前一批时您的函数执行受到限制,额外的时间允许 Lambda 重试。
    【解决方案3】:

    由于 lambdas 的 SQS 触发器批量处理消息,因此在标准 forEach 中使用 await 不起作用(如果回调是异步的,则 forEach 不会等待回调)。要绕过它,您可以创建自己的异步 forEach 版本:

    var AWS = require('aws-sdk');
    
    exports.handler = async function(event, context) {
        var sqs = new AWS.SQS({apiVersion: '2012-11-05'});
        
        await asyncForEach(event.Records, async record => {
            const { receiptHandle } = record;
                const sqsParams = {
                  QueueUrl: '<YOUR_QUEUE_URL>', /* required */
                  ReceiptHandle: receiptHandle, /* required */
                  VisibilityTimeout: '<in seconds>' /* required */
                };
                try {
                    let res = await sqs.changeMessageVisibility(sqsParams).promise();
                    console.log(res);
                } catch (err) {
                    console.log(err, err.stack);
                    throw new Error('Fail.');
                }
            }
        });
        return {};
    };
    
    async function asyncForEach(array, callback) {
      for (let index = 0; index < array.length; index++) {
        await callback(array[index], index, array);
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-12-01
      • 2020-10-15
      • 2020-02-15
      • 2021-05-14
      • 2021-05-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-17
      相关资源
      最近更新 更多