【问题标题】:Execute lambda function for multiple files in S3在 S3 中为多个文件执行 lambda 函数
【发布时间】:2019-10-19 13:54:36
【问题描述】:

我试图找出在随机添加文件时一次在 S3 中处理多个文件的逻辑。为了讨论起见,这里有一个例子:

  1. 文件随机添加到 S3 存储桶;突发或随机间隔
  2. 一旦 S3 存储桶中有 9 个文件,就会触发 Lambda 函数; lambda 函数将这些文件后处理或组合在一起。
  3. 处理后,文件将被移动到另一个存储桶或删除。

这是我尝试过的:

  • 我有所有 S3 放置的 S3 触发器
  • 在我的 lambda 函数中,我忽略文件名本身并根据键列出 S3 存储桶以计算存在多少文件
  • 问题是当流量突发或稳定到达但速度很快时,很难识别唯一的 9 个文件组
  • 出于性能原因,我在文件名上添加了 uuid 前缀,因此不存在顺序文件名。
  • 我考虑过将元数据写入 nosql 数据库,但还没有走这条路。

【问题讨论】:

  • 也许预定的 lambda 可以实现这一点?
  • 可能,但如果我将时间表设置为接近实时(比如每 5 秒),它可能无法按预期工作。我不能保证我的函数会在
  • 对于仅添加3个文件(或数量少于9个)并且2小时没有更多文件或流量的情况,您是否希望lambda处理这3个文件?这些文件有多大?文件是如何放入存储桶的?应用程序接口?还是用户直接使用 URL 发起的?

标签: python-3.x aws-lambda amazon-sqs


【解决方案1】:

一种可能的解决方案是使用计划的 lambda(可以根据您的流量尽可能频繁或尽可能稀疏),从由 S3 put 事件填充的 SQS 队列中提取事件。假设您一次专注于批处理 n 文件,并且顺序无关紧要(给定 uuid 命名)。

要创建这个工作流程,应该是这样的:

  1. 创建用于保存 S3 PUT 事件的 SQS 队列
  2. 将触发器添加到 PUT 上的 S3 存储桶,以在 SQS 队列中从 1 创建事件。
  3. 使用环境变量创建 Lambda(用于存储桶和队列)
    1. 如果有任何正在进行的消息,则 lambda 应检查队列并仅使用存储桶
    2. 如果有,停止运行(防止文件被多次处理)
    3. 如果没有正在进行的消息,则列出来自 S3 的对象,限制为 n(您的批量大小)
    4. 如果返回了足够多的对象(可能小于n),则运行您的流程逻辑
    5. 删除文件
  4. 为每n 秒/分钟/小时运行一次 lambda 创建 CloudWatch 规则

根据您的具体情况,需要记住的其他一些事项:

  • 如果有大量文件被快速发送并且n 非常小,则单跟踪处理(步骤 3.2 会导致处理时间过长)。这也取决于处理时间的长短,数据是否可以多次处理等等......
  • ListObjectsV2 可能返回小于 MaxKeys 参数,如果这是一个问题,可以有一个更大的 MaxKeys 并只处理第一个 n

【讨论】:

    【解决方案2】:

    您还可以考虑使用触发 lambda/glue 作业的 step 函数将文件进一步复制到 Redshift/s3,引入一些文件计数逻辑(假设到达的文件数量固定)/等待时间(例如 30分钟假设所有文件都已登陆)。这不是完美的解决方案,但如果您修复文件流,它可能会工作得很好。

    【讨论】:

      猜你喜欢
      • 2019-12-11
      • 2020-06-30
      • 2023-04-02
      • 2019-02-26
      • 1970-01-01
      • 2022-01-23
      • 2023-01-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多