【发布时间】:2021-11-26 16:57:12
【问题描述】:
我在下面有这段代码,它能够并行下载文件的多个部分并使用内存映射文件写入它们。问题出现在DownloadFile() 函数中。该文件正在正确开始下载,但在此过程中已损坏。例如,如果我尝试下载图像,它的某些部分将被损坏。我不确定这是否来自代码中的某种竞争传导,或者它是否与部件的内容范围计算有关。任何有关问题如何发生或发生的帮助将不胜感激,谢谢!
最小的、可重现的例子:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.IO.MemoryMappedFiles;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
namespace ZenTester
{
internal class FileChunk
{
public long Start { get; set; }
public long End { get; set; }
public FileChunk(){}
public int Id { get; set; }
public FileChunk(long startByte, long endByte)
{
Start = startByte;
End = endByte;
}
}
internal class RetryHandler : DelegatingHandler
{
private int _maxRetries = 3;
public RetryHandler(HttpMessageHandler innerHandler) : base(innerHandler) { }
public RetryHandler(HttpMessageHandler innerHandler, int maxRetries) : base(innerHandler)
{
_maxRetries = maxRetries;
}
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
HttpResponseMessage response = null;
for (var i = 0; i < _maxRetries; i++)
{
response = await base.SendAsync(request, cancellationToken);
if (response.IsSuccessStatusCode)
{
return response;
}
}
return response;
}
}
public static class ZenTester
{
private static async Task DownloadFile(string url, int parts, string outFile = null!)
{
var responseLength = (await WebRequest.Create(url).GetResponseAsync()).ContentLength;
var partSize = (long)Math.Floor(responseLength / (parts + 0.0));
var pieces = new List<FileChunk>();
var uri = new Uri(url);
WriteLine(responseLength.ToString(CultureInfo.InvariantCulture) + " TOTAL SIZE");
WriteLine(partSize.ToString(CultureInfo.InvariantCulture) + " PART SIZE" + "\n");
string filename = outFile ?? Path.GetFileName(uri.LocalPath);
var mmf = MemoryMappedFile.CreateFromFile(filename, FileMode.OpenOrCreate, null, responseLength);
var httpPool = new HttpClient(new RetryHandler(new HttpClientHandler(), 10)) {MaxResponseContentBufferSize = 1000000000};
//Loop to add all the events to the queue
for (long i = 0; i < responseLength; i += partSize)
{
pieces.Add(i + partSize < responseLength
? new FileChunk(i, i + partSize)
: new FileChunk(i, responseLength));
}
await Parallel.ForEachAsync(pieces, parallelOptions, async (piece, cancellationToken) =>
{
var client = httpPool.Get();
var request = new HttpRequestMessage { RequestUri = new Uri(url) };
request.Headers.Range = new RangeHeaderValue(piece.Start, piece.End);
var message = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).Result;
if (message.IsSuccessStatusCode)
{
await using var streamToRead = await message.Content.ReadAsStreamAsync();
var streams = mmf.CreateViewStream(piece.Start, piece.End-piece.Start - 1);
var T = streamToRead.CopyToAsync(streams);
T.Wait();
if (T.IsCompletedSuccessfully)
{
streams.Flush();
streams.Close();
}
}
});
}
public static void Main(string[] args)
{
var url = "https://wallpaperaccess.com/full/2159447.jpg";
var s = DownloadFile(url, 8);
s.Wait();
}
}
}
【问题讨论】:
-
@TheodorZoulias 是的,我刚试过,它比我使用的 Parallel For 干净得多。谢谢!现在我得到一个 System.AggregateException 说我不能扩展流的长度。我假设这发生在 CopyToAsync() 中,但我不明白为什么这会超出范围,因为我事先计算过
-
是的,我认为这对示例很有用或与损坏有关,但我将其删除并用普通的 HTTP 客户端替换它。
-
附带说明,
HttpClient类旨在实例化 once,并在应用程序的整个生命周期中重复使用。
标签: c# asynchronous .net-core networking parallel-processing