我决定放弃使用高级 TransferUtility API 方法,因为它似乎不适合处理大文件。似乎它将整个文件加载到内存中,然后将其拆分为多个部分并上传每个部分。对于大文件,它只会消耗所有可用内存,并且您的服务器可能会停止运行。
对于任何感兴趣的人,这就是我解决问题的方法:
- 我现在使用低级 api 方法 InitiateMultipartUploadAsync、UploadPartAsync 和 CompleteMultipartUploadAsync 并自己管理分段上传。
- 完成这项工作的关键是使用 .Net MemoryMappedFile 类和 CreateViewStream 方法来管理一次只将文件的各个部分检索到内存中。
- 我使用队列来控制已上传的部分,并重试任何可能失败的单个部分。
这是我的新代码:
using Amazon;
using Amazon.S3;
using Amazon.S3.Model;
using System;
using System.IO;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Generic;
using System.IO.MemoryMappedFiles;
using System.Linq;
using Amazon.Runtime;
public class S3Upload
{
// declarations
private readonly string _bucketName;
private readonly RegionEndpoint _region;
//event handlers
public event EventHandler<ProgressUpdatedEventArgs> OnProgressUpdated;
private bool CheckFilePath(string filePath)
{
// check the filePath exists
if (!Directory.Exists(Path.GetDirectoryName(filePath)))
{
return false;
}
if (!File.Exists(filePath))
{
return false;
}
return true;
}
public async Task UploadFileMultiPartAsync(string keyName, string filePath, string storageClass,
int partSizeMB = 16, int retryWaitInterval = 60000,
int maxRetriesOnFail = 10)
{
if (CheckFilePath(filePath))
{
long fileSize = new FileInfo(filePath).Length;
long partSize = partSizeMB * (long)Math.Pow(1024, 2);
partSize = GetPartSize(fileSize, partSize);
S3StorageClass sClass = new S3StorageClass(storageClass);
try
{
await UploadFileMultiPartAsync(keyName, filePath, fileSize, partSize, sClass, retryWaitInterval, maxRetriesOnFail);
}
catch (Exception ex)
{
throw new Exception(ex.Message, ex.InnerException);
}
}
else
{
string errMsg = string.Format("Cannot find file {0}. Check the file exists and that the application has access permissions.", filePath);
System.IO.DirectoryNotFoundException argEx = new System.IO.DirectoryNotFoundException(errMsg);
throw argEx;
}
}
private async Task UploadFileMultiPartAsync(string keyName, string filePath, long fileSize,
long partSize, S3StorageClass storageClass,
int retryWaitInterval,
int maxRetriesOnFail)
{
int retryCount = 0;
long offset = 0;
// we need to calculate the number of parts based on the fileSize and partSize
int iterations = (int)Math.Ceiling((double)fileSize / (double)partSize);
int currentIterations = iterations;
// create a queue of indexes to be processed. Indexes will be removed from this list as the
// uploads are processed. If the upload is not successful then it will be re-added to the end
// of the queue for later retry. We pause after each full loop is completed before starting the retry
Queue<int> q = new Queue<int>(Enumerable.Range(0, iterations));
// the following 2 variables store values returned from the S3 call and are persisted throughout the loop
string uploadId = "";
List<PartETag> eTags = new List<PartETag>();
// Create the memory-mapped file.
using (var mmf = MemoryMappedFile.CreateFromFile(filePath, FileMode.Open, "uploadFile"))
{
while (q.Count > 0)
{
int iPart = q.Dequeue();
offset = iPart * partSize;
long chunkSize = (offset + partSize > fileSize) ? fileSize - offset : partSize;
using (var stream = mmf.CreateViewStream(offset, chunkSize))
{
using (BinaryReader binReader = new BinaryReader(stream))
{
byte[] bytes = binReader.ReadBytes((int)stream.Length);
//convert to stream
MemoryStream mStream = new MemoryStream(bytes, false);
bool lastPart = (q.Count == 0) ? true : false;
UploadResponse response = await UploadChunk(keyName, uploadId, iPart, lastPart, mStream, eTags, iterations);
uploadId = response.uploadId;
eTags = response.eTags;
if (!response.success)
{
// the upload failed so we add the failed index to the back of the
// queue for retry later
q.Enqueue(iPart);
lastPart = false;
}
// if we have attempted an upload for every part and some have failed then we
// wait a bit then try resending the parts that failed. We try this a few times
// then give up.
if (!lastPart && iPart == currentIterations - 1)
{
if (retryCount < maxRetriesOnFail)
{
currentIterations = q.Count;
Thread.Sleep(retryWaitInterval);
retryCount += 1;
}
else
{
// reached maximum retries so we abort upload and raise error
try
{
await AbortMultiPartUploadAsync(keyName, uploadId);
string errMsg = "Multi part upload aborted. Some parts could not be uploaded. Maximum number of retries reached.";
throw new Exception(errMsg);
}
catch (Exception ex)
{
string errMsg = string.Format("Multi part upload failed. Maximum number of retries reached. Unable to abort upload. Error: {0}", ex.Message);
throw new Exception(errMsg);
}
}
}
}
}
}
}
}
private async Task AbortMultiPartUploadAsync(string keyName, string uploadId)
{
using (var _s3Client = new AmazonS3Client(_region))
{
AbortMultipartUploadRequest abortMPURequest = new AbortMultipartUploadRequest
{
BucketName = _bucketName,
Key = keyName,
UploadId = uploadId
};
await _s3Client.AbortMultipartUploadAsync(abortMPURequest);
}
}
private async Task<UploadResponse> UploadChunk(string keyName, string uploadId, int chunkIndex, bool lastPart, MemoryStream stream, List<PartETag> eTags, int numParts)
{
try
{
using (var _s3Client = new AmazonS3Client(_region))
{
var partNumber = chunkIndex + 1;
// Step 1: build and send a multi upload request
// we check uploadId == "" rather than chunkIndex == 0 as if the initiate call failed on the first run
// then chunkIndex = 0 would have been added to the end of the queue for retries and uploadId
// will still not have been initialized, even though we might be on a later chunkIndex
if (uploadId == "")
{
var initiateRequest = new InitiateMultipartUploadRequest
{
BucketName = _bucketName,
Key = keyName
};
InitiateMultipartUploadResponse initResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest);
uploadId = initResponse.UploadId;
}
// Step 2: upload each chunk (this is run for every chunk unlike the other steps which are run once)
var uploadRequest = new UploadPartRequest
{
BucketName = _bucketName,
Key = keyName,
UploadId = uploadId,
PartNumber = partNumber,
InputStream = stream,
IsLastPart = lastPart,
PartSize = stream.Length
};
// Track upload progress.
uploadRequest.StreamTransferProgress +=
(_, e) => OnPartUploadProgressUpdate(numParts, uploadRequest, e);
UploadPartResponse uploadResponse = await _s3Client.UploadPartAsync(uploadRequest);
//Step 3: build and send the multipart complete request
if (lastPart)
{
eTags.Add(new PartETag
{
PartNumber = partNumber,
ETag = uploadResponse.ETag
});
var completeRequest = new CompleteMultipartUploadRequest
{
BucketName = _bucketName,
Key = keyName,
UploadId = uploadId,
PartETags = eTags
};
CompleteMultipartUploadResponse result = await _s3Client.CompleteMultipartUploadAsync(completeRequest);
return new UploadResponse(uploadId, eTags, true);
}
else
{
eTags.Add(new PartETag
{
PartNumber = partNumber,
ETag = uploadResponse.ETag
});
return new UploadResponse(uploadId, eTags, true);
}
}
}
catch
{
return new UploadResponse(uploadId, eTags, false);
}
}
private class UploadResponse
{
public string uploadId { get; set; }
public List<PartETag> eTags { get; set; }
public bool success { get; set; }
public UploadResponse(string Id, List<PartETag> Tags, bool succeeded)
{
uploadId = Id;
eTags = Tags;
success = succeeded;
}
}
private void OnPartUploadProgressUpdate(int numParts, UploadPartRequest request, StreamTransferProgressArgs e)
{
// Process event.
if (OnProgressUpdated != null)
{
int partIndex = request.PartNumber - 1;
int totalIncrements = numParts * 100;
int percentDone = (int)Math.Floor((double)(partIndex * 100 + e.PercentDone) / (double)totalIncrements * 100);
OnProgressUpdated(this, new ProgressUpdatedEventArgs(percentDone));
}
}
private long GetPartSize(long fileSize, long partSize)
{
// S3 multi part limits
//====================================
// min part size = 5MB
// max part size = 5GB
// total number of parts = 10,000
//====================================
if (fileSize < partSize)
{
partSize = fileSize;
}
if (partSize <= 0)
{
return Math.Min(fileSize, 16 * (long)Math.Pow(1024, 2)); // default part size to 16MB
}
if (partSize > 5000 * (long)Math.Pow(1024, 2))
{
return 5000 * (long)Math.Pow(1024, 2);
}
if (fileSize / partSize > 10000)
{
return (int)(fileSize / 10000);
}
return partSize;
}
}
public class ProgressUpdatedEventArgs : EventArgs
{
public ProgressUpdatedEventArgs(int iPercentDone)
{ PercentDone = iPercentDone; }
public int PercentDone { get; set; }
}