【问题标题】:Lambda - Python - CSV to NDJSON - Fails to dump huge filesLambda - Python - CSV 到 NDJSON - 无法转储大文件
【发布时间】:2020-12-17 19:06:13
【问题描述】:

我正在开发一个 lambda,它将存储在 Bucket-A(源)中的 CSV 文件转换为 NDJSON,并将其移动到 Bucket-B(目标)

下面的逻辑对小文件可以正常工作,但我的 CSV 文件预计会超过 200 MB,有些约为 2.5GB,即使 lambda 设置为最大超时,此逻辑也会超时。

我在看一个帖子,说的是使用 lambda tmp 空间直接将信息写入/附加到文件,可以上传到 S3,但 tmp 空间的最大大小约为 ~500 MB

感谢您通读。
非常感谢任何解决此问题的帮助。

import boto3
import ndjson
import csv
from datetime import datetime, timezone
from io import StringIO
import os

def lambda_handler(event, context):
    errMsg = None
    target_resp_list = []
    l_utcdatetime = datetime.utcnow()
    l_timestamp = l_utcdatetime.strftime('%Y%m%d%H%M%S')
    
    s3 = boto3.resource('s3')
    s3_client = boto3.client('s3')
    dynamodb = boto3.resource('dynamodb', region_name=os.environ['AWS_REGION'])
    
    for record in event["Records"]:
        
        # Source bucket and key of the new file landed
        source_bucket = record["s3"]["bucket"]["name"]
        source_key = record["s3"]["object"]["key"]
        source_file_name = source_key.split("/")[-1].split(".")[0]
        
        bucket = s3.Bucket(source_bucket)
        obj = bucket.Object(key=source_key)
        response = obj.get()
        records = StringIO(response['Body'].read().decode())

        # loop through the csv records and add it to the response list, while adding the snapshot_datetime to each record
        for row in csv.DictReader(records):
            row['source_snapshot_datetime'] = f'{l_utcdatetime}'
            target_resp_list.append(row)

        # The below attributes are used in copying the ndjson file to the destination bucket
        l_target_bucket = os.getenv("TargetBucket")
        l_target_prefix = os.getenv("TargetPrefix")
        l_target_key = f"{l_target_prefix}/{source_file_name}_{l_timestamp}.ndjson"

        # Moving the ndjson file to Snowflake staging bucket
        try:
            s3_client.put_object(Body=ndjson.dumps(target_resp_list), 
                Bucket=l_target_bucket, 
                Key=l_target_key
            )
            print("File moved to destination bucket.")
        except Exception as ex1:
            errMsg = f"Error while copying the file from source to destination bucket - {ex1}"
        
        # Raise exception in case of copy fail
        if errMsg is not None:
            raise Exception(errMsg)

【问题讨论】:

  • 为什么不使用 fargate 而不是 lambda?
  • 这个 lambda 预计是基于 S3 put 的事件驱动,这可能与 Fargate 相关吗?
  • 好的,我做了一些挖掘工作,我的架构目前不涉及 Fargate,但是这似乎需要大量额外的工作来创建、维护和保护用于此任务的容器。跨度>

标签: python amazon-web-services aws-lambda ndjson


【解决方案1】:

Lambda 每次执行最多可以运行 15 分钟。 我建议首先检查在本地首先处理文件的最坏情况。如果您希望文件很大,请尝试将 lambda 内存提高到满足您要求的最大可行值。

提示:

  • 尝试压缩文件,按GB压缩的CSV文件减少到兆字节,文本可以压缩很多。
  • 尝试提前拆分工作,如果这个巨大的文件可以由一个 lambda 拆分并由另一个 lambda 处理,您将不会太在意执行 timo-out。

【讨论】:

  • 是的,我想到了分离逻辑,一个 lambda 来拆分文件,然后一个 S3 事件触发器来调用另一个 lambda 进行处理。但我已经在处理两个暂存桶了,
【解决方案2】:

这里留给以后可能会来看的人。

我认为问题在于 ndjson.dumps 需要花费大量时间来转换列表并推送到 S3,因此我所做的是使用计数器来分块源记录 - 每个 50K,然后调用 dumpChunkToS3(),这基本上是逻辑转储到 S3。
需要一个额外的条件语句,因为行/记录的数量甚至不会除以 50K(在我的情况下是租用的)

# loop through the csv records and add it to the response list, while adding the snapshot_datetime to the record
for row in csv.DictReader(records):
    row['source_snapshot_datetime'] = f'{l_utcdatetime}'
    rowscnt += 1
    target_resp_list.append(row)
    if rowscnt == 50000:
        chunk_id += 1
        respMsg = dumpChunkToS3(s3_client, source_file_name, target_resp_list, chunk_id)
        rowscnt = 0
        del target_resp_list[:]

if rowscnt > 0:
    chunk_id += 1
    respMsg = dumpChunkToS3(s3_client, source_file_name, target_resp_list, chunk_id)

【讨论】:

    猜你喜欢
    • 2021-03-07
    • 1970-01-01
    • 2020-11-23
    • 1970-01-01
    • 2021-05-30
    • 1970-01-01
    • 2018-06-30
    • 2020-12-09
    • 2016-02-08
    相关资源
    最近更新 更多