【问题标题】:High memory usage and slow upload using AWS S3 TransferUtility使用 AWS S3 TransferUtility 的内存使用率高且上传速度慢
【发布时间】:2020-05-22 19:14:10
【问题描述】:

我使用 C# 和 AWS .Net SDK (v3.3.106.25) 创建了一个应用程序,用于将一些数据库备份文件上传到 S3 存储桶。该应用程序目前无法使用,因为内存使用率高达 100%,并且文件上传时间越来越长。

我正在尝试上传 3 个文件,其中 2 个大约 1.45GB,一个大约 4MB。我正在使用 TransferUtility 方法,因为我知道它使用多部分上传。我已将零件大小设置为 16MB。每个文件都是连续上传的。以下是有关上传的一些事实:

文件 1 - 4MB - 上传时长 4 秒

文件 2 - 1.47GB - 上传时长 11.5 分钟

文件 3 - 1.45GB - 上传持续时间为 1 小时 12 分钟,然后由于 PC 无法使用而终止进程

我在具有 16GB RAM 和 Intel Core i7 CPU @ 3.40GHz 的 Windows 10 机器上运行它

这是我的上传代码:

private async Task UploadFileAsync(string keyName, string filePath, int partSizeMB, S3StorageClass  storageClass)
{
    try
    {
        using (IAmazonS3 s3Client = new AmazonS3Client(_region))
        {
            var fileTransferUtility = new TransferUtility(s3Client);

            var fileTransferUtilityRequest = new TransferUtilityUploadRequest
            {
                BucketName = _bucketName,
                FilePath = filePath,
                StorageClass = storageClass,
                PartSize = partSizeMB * 1024 * 1024,    // set to 16MB
                Key = keyName,
                CannedACL = S3CannedACL.Private
            };

            await fileTransferUtility.UploadAsync(fileTransferUtilityRequest);
        }

    }
    catch (AmazonS3Exception e)
    {
        string errMsg = string.Format("Error encountered on server. Message:{0} when writing an object", e.Message);
        System.Exception argEx = new System.Exception(errMsg, e.InnerException);
        throw argEx;
    }
    catch (Exception e)
    {
        string errMsg = string.Format("Unknown encountered on server. Message:'{0}' when writing an object", e.Message);
        System.Exception argEx = new System.Exception(errMsg, e.InnerException);
        throw argEx;
    }

}

此代码在循环中被调用 3 次,每次调用都在等待。

谁能建议我如何以更有效的方式上传这些文件。

非常感谢。

【问题讨论】:

    标签: c# amazon-s3 aws-sdk-net


    【解决方案1】:

    我决定放弃使用高级 TransferUtility API 方法,因为它似乎不适合处理大文件。似乎它将整个文件加载到内存中,然后将其拆分为多个部分并上传每个部分。对于大文件,它只会消耗所有可用内存,并且您的服务器可能会停止运行。

    对于任何感兴趣的人,这就是我解决问题的方法:

    1. 我现在使用低级 api 方法 InitiateMultipartUploadAsync、UploadPartAsync 和 CompleteMultipartUploadAsync 并自己管理分段上传。
    2. 完成这项工作的关键是使用 .Net MemoryMappedFile 类和 CreateViewStream 方法来管理一次只将文件的各个部分检索到内存中。
    3. 我使用队列来控制已上传的部分,并重试任何可能失败的单个部分。

    这是我的新代码:

    using Amazon;
    using Amazon.S3;
    using Amazon.S3.Model;
    using System;
    using System.IO;
    using System.Threading.Tasks;
    using System.Threading;
    using System.Collections.Generic;
    using System.IO.MemoryMappedFiles;
    using System.Linq;
    using Amazon.Runtime;
    
    
    public class S3Upload
    {
        // declarations
        private readonly string _bucketName;
        private readonly RegionEndpoint _region;
    
        //event handlers
        public event EventHandler<ProgressUpdatedEventArgs> OnProgressUpdated;
    
        private bool CheckFilePath(string filePath)
        {
            // check the filePath exists
            if (!Directory.Exists(Path.GetDirectoryName(filePath)))
            {
                return false;
            }
    
            if (!File.Exists(filePath))
            {
                return false;
            }
            return true;
        }
    
        public async Task UploadFileMultiPartAsync(string keyName, string filePath, string storageClass,
                                            int partSizeMB = 16, int retryWaitInterval = 60000,
                                            int maxRetriesOnFail = 10)
        {
            if (CheckFilePath(filePath))
            {
                long fileSize = new FileInfo(filePath).Length;
                long partSize = partSizeMB * (long)Math.Pow(1024, 2);
                partSize = GetPartSize(fileSize, partSize);
                S3StorageClass sClass = new S3StorageClass(storageClass);
    
                try
                {
                    await UploadFileMultiPartAsync(keyName, filePath, fileSize, partSize, sClass, retryWaitInterval, maxRetriesOnFail);
                }
                catch (Exception ex)
                {
                    throw new Exception(ex.Message, ex.InnerException);
                }
            }
            else
            {
                string errMsg = string.Format("Cannot find file {0}.  Check the file exists and that the application has access permissions.", filePath);
                System.IO.DirectoryNotFoundException argEx = new System.IO.DirectoryNotFoundException(errMsg);
                throw argEx;
            }
        }
    
        private async Task UploadFileMultiPartAsync(string keyName, string filePath, long fileSize,
                                                        long partSize, S3StorageClass storageClass,
                                                        int retryWaitInterval,
                                                        int maxRetriesOnFail)
        {
            int retryCount = 0;
            long offset = 0;
            // we need to calculate the number of parts based on the fileSize and partSize
            int iterations = (int)Math.Ceiling((double)fileSize / (double)partSize);
            int currentIterations = iterations;
            // create a queue of indexes to be processed.  Indexes will be removed from this list as the
            // uploads are processed.  If the upload is not successful then it will be re-added to the end
            // of the queue for later retry.  We pause after each full loop is completed before starting the retry
            Queue<int> q = new Queue<int>(Enumerable.Range(0, iterations));
            // the following 2 variables store values returned from the S3 call and are persisted throughout the loop
            string uploadId = "";
            List<PartETag> eTags = new List<PartETag>();
            // Create the memory-mapped file. 
            using (var mmf = MemoryMappedFile.CreateFromFile(filePath, FileMode.Open, "uploadFile"))
            {
                while (q.Count > 0)
                {
                    int iPart = q.Dequeue();
                    offset = iPart * partSize;
                    long chunkSize = (offset + partSize > fileSize) ? fileSize - offset : partSize;
                    using (var stream = mmf.CreateViewStream(offset, chunkSize))
                    {
                        using (BinaryReader binReader = new BinaryReader(stream))
                        {
                            byte[] bytes = binReader.ReadBytes((int)stream.Length);
                            //convert to stream
                            MemoryStream mStream = new MemoryStream(bytes, false);
    
                            bool lastPart = (q.Count == 0) ? true : false;
    
                            UploadResponse response = await UploadChunk(keyName, uploadId, iPart, lastPart, mStream, eTags, iterations);
                            uploadId = response.uploadId;
                            eTags = response.eTags;
    
                            if (!response.success)
                            {
                                // the upload failed so we add the failed index to the back of the 
                                // queue for retry later
                                q.Enqueue(iPart);
                                lastPart = false;
                            }
                            // if we have attempted an upload for every part and some have failed then we
                            // wait a bit then try resending the parts that failed.  We try this a few times
                            // then give up.
                            if (!lastPart && iPart == currentIterations - 1)
                            {
                                if (retryCount < maxRetriesOnFail)
                                {
                                    currentIterations = q.Count;
                                    Thread.Sleep(retryWaitInterval);
                                    retryCount += 1;
                                }
                                else
                                {
                                    // reached maximum retries so we abort upload and raise error
                                    try
                                    {
                                        await AbortMultiPartUploadAsync(keyName, uploadId);
                                        string errMsg = "Multi part upload aborted. Some parts could not be uploaded. Maximum number of retries reached.";
                                        throw new Exception(errMsg);
                                    }
                                    catch (Exception ex)
                                    {
                                        string errMsg = string.Format("Multi part upload failed. Maximum number of retries reached.  Unable to abort upload. Error: {0}", ex.Message);
                                        throw new Exception(errMsg);
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    
        private async Task AbortMultiPartUploadAsync(string keyName, string uploadId)
        {
            using (var _s3Client = new AmazonS3Client(_region))
            {
                AbortMultipartUploadRequest abortMPURequest = new AbortMultipartUploadRequest
                {
                    BucketName = _bucketName,
                    Key = keyName,
                    UploadId = uploadId
                };
                await _s3Client.AbortMultipartUploadAsync(abortMPURequest);
            }
        }
    
        private async Task<UploadResponse> UploadChunk(string keyName, string uploadId, int chunkIndex, bool lastPart, MemoryStream stream, List<PartETag> eTags, int numParts)
        {
            try
            {
                using (var _s3Client = new AmazonS3Client(_region))
                {
                    var partNumber = chunkIndex + 1;
    
                    // Step 1: build and send a multi upload request
                    // we check uploadId == "" rather than chunkIndex == 0 as if the initiate call failed on the first run
                    // then chunkIndex = 0 would have been added to the end of the queue for retries and uploadId
                    // will still not have been initialized, even though we might be on a later chunkIndex
                    if (uploadId == "")
                    {
                        var initiateRequest = new InitiateMultipartUploadRequest
                        {
                            BucketName = _bucketName,
                            Key = keyName
                        };
    
                        InitiateMultipartUploadResponse initResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest);
                        uploadId = initResponse.UploadId;
                    }
    
                    // Step 2: upload each chunk (this is run for every chunk unlike the other steps which are run once)
                    var uploadRequest = new UploadPartRequest
                    {
                        BucketName = _bucketName,
                        Key = keyName,
                        UploadId = uploadId,
                        PartNumber = partNumber,
                        InputStream = stream,
                        IsLastPart = lastPart,
                        PartSize = stream.Length
                    };
    
                    // Track upload progress.
                    uploadRequest.StreamTransferProgress +=
                        (_, e) => OnPartUploadProgressUpdate(numParts, uploadRequest, e);
    
                    UploadPartResponse uploadResponse = await _s3Client.UploadPartAsync(uploadRequest);
    
                    //Step 3: build and send the multipart complete request
                    if (lastPart)
                    {
                        eTags.Add(new PartETag
                        {
                            PartNumber = partNumber,
                            ETag = uploadResponse.ETag
                        });
    
                        var completeRequest = new CompleteMultipartUploadRequest
                        {
                            BucketName = _bucketName,
                            Key = keyName,
                            UploadId = uploadId,
                            PartETags = eTags
                        };
    
                        CompleteMultipartUploadResponse result = await _s3Client.CompleteMultipartUploadAsync(completeRequest);
                        return new UploadResponse(uploadId, eTags, true);
                    }
                    else
                    {
                        eTags.Add(new PartETag
                        {
                            PartNumber = partNumber,
                            ETag = uploadResponse.ETag
                        });
    
                        return new UploadResponse(uploadId, eTags, true);
                    }
                }
            }
            catch
            {
                return new UploadResponse(uploadId, eTags, false);
            }
    
    
        }
    
        private class UploadResponse
        {
            public string uploadId { get; set; }
            public List<PartETag> eTags { get; set; }
            public bool success { get; set; }
    
            public UploadResponse(string Id, List<PartETag> Tags, bool succeeded)
            {
                uploadId = Id;
                eTags = Tags;
                success = succeeded;
            }
        }
    
        private void OnPartUploadProgressUpdate(int numParts, UploadPartRequest request, StreamTransferProgressArgs e)
        {
            // Process event. 
            if (OnProgressUpdated != null)
            {
                int partIndex = request.PartNumber - 1;
                int totalIncrements = numParts * 100;
                int percentDone = (int)Math.Floor((double)(partIndex * 100 + e.PercentDone) / (double)totalIncrements * 100);
                OnProgressUpdated(this, new ProgressUpdatedEventArgs(percentDone));
            }
        }
    
        private long GetPartSize(long fileSize, long partSize)
        {
            // S3 multi part limits
            //====================================
            // min part size = 5MB
            // max part size = 5GB
            // total number of parts = 10,000
            //====================================
    
            if (fileSize < partSize)
            {
                partSize = fileSize;
            }
    
            if (partSize <= 0)
            {
                return Math.Min(fileSize, 16 * (long)Math.Pow(1024, 2));    // default part size to 16MB
            }
    
            if (partSize > 5000 * (long)Math.Pow(1024, 2))
            {
                return 5000 * (long)Math.Pow(1024, 2);
            }
    
            if (fileSize / partSize > 10000)
            {
                return (int)(fileSize / 10000);
            }
    
            return partSize;
        }
    
    }
    
    public class ProgressUpdatedEventArgs : EventArgs
    {
        public ProgressUpdatedEventArgs(int iPercentDone)
        { PercentDone = iPercentDone; }
        public int PercentDone { get; set; }
    }
    

    【讨论】:

    • 我正在研究这个,因为我们遇到了类似的问题;但我很好奇您是否在其他地方有一些依赖项!具体来说,我无法在您的区块中看到对ProgressUpdatedEventArgsOnProgressUpdatedCheckFilePath 的任何引用。顺便说一句,非常感谢您写下所有这些 - 非常感谢!
    • 嗨,是的,对不起,我原来的 sn-p 中没有包含一些依赖项。我已经使用语句和方法更新了帖子以包含所有必需的依赖项。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-07-21
    • 1970-01-01
    • 1970-01-01
    • 2018-06-26
    相关资源
    最近更新 更多