【问题标题】:Parallel file download corruption并行文件下载损坏
【发布时间】:2021-11-26 16:57:12
【问题描述】:

我在下面有这段代码,它能够并行下载文件的多个部分并使用内存映射文件写入它们。问题出现在DownloadFile() 函数中。该文件正在正确开始下载,但在此过程中已损坏。例如,如果我尝试下载图像,它的某些部分将被损坏。我不确定这是否来自代码中的某种竞争传导,或者它是否与部件的内容范围计算有关。任何有关问题如何发生或发生的帮助将不胜感激,谢谢!

最小的、可重现的例子:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.IO.MemoryMappedFiles;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;

namespace ZenTester
{
internal class FileChunk
{
    public long Start { get; set; }
    public long End { get; set; }
    public FileChunk(){}
    public int Id { get; set; }

    public FileChunk(long startByte, long endByte)
    {
        Start = startByte;
        End = endByte;
    }
}

internal class RetryHandler : DelegatingHandler
{
    private int _maxRetries = 3;

    public RetryHandler(HttpMessageHandler innerHandler) : base(innerHandler) { }

    public RetryHandler(HttpMessageHandler innerHandler, int maxRetries) : base(innerHandler)
    {
        _maxRetries = maxRetries;
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        HttpResponseMessage response = null;
        for (var i = 0; i < _maxRetries; i++)
        {
            response = await base.SendAsync(request, cancellationToken);
            if (response.IsSuccessStatusCode)
            {
                return response;
            }
        }

        return response;
    }
}

public static class ZenTester
{
    private static async Task DownloadFile(string url, int parts, string outFile = null!)
    {
        var responseLength = (await WebRequest.Create(url).GetResponseAsync()).ContentLength;
        var partSize = (long)Math.Floor(responseLength / (parts + 0.0));
        var pieces = new List<FileChunk>();
        var uri = new Uri(url);
        
        WriteLine(responseLength.ToString(CultureInfo.InvariantCulture) + " TOTAL SIZE");
        WriteLine(partSize.ToString(CultureInfo.InvariantCulture) + " PART SIZE" + "\n");
        
        string filename = outFile ?? Path.GetFileName(uri.LocalPath);

        var mmf = MemoryMappedFile.CreateFromFile(filename, FileMode.OpenOrCreate, null, responseLength);

        var httpPool = new HttpClient(new RetryHandler(new HttpClientHandler(), 10)) {MaxResponseContentBufferSize = 1000000000};
        
        //Loop to add all the events to the queue
        for (long i = 0; i < responseLength; i += partSize)
        {
            pieces.Add(i + partSize < responseLength
                ? new FileChunk(i, i + partSize)
                : new FileChunk(i, responseLength));
        }

        await Parallel.ForEachAsync(pieces, parallelOptions, async (piece, cancellationToken) =>
        {
            var client = httpPool.Get();
            var request = new HttpRequestMessage { RequestUri = new Uri(url) };
            request.Headers.Range = new RangeHeaderValue(piece.Start, piece.End);
                
            var message = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).Result;

            if (message.IsSuccessStatusCode)
            {
                await using var streamToRead = await message.Content.ReadAsStreamAsync();
                var streams = mmf.CreateViewStream(piece.Start, piece.End-piece.Start - 1);
                var T = streamToRead.CopyToAsync(streams);
                T.Wait();
                if (T.IsCompletedSuccessfully)
                {
                    streams.Flush();
                    streams.Close();
                }
            }
        });
        
    }
    
    public static void Main(string[] args)
    {
        var url = "https://wallpaperaccess.com/full/2159447.jpg";

        var s = DownloadFile(url, 8);
        
        s.Wait();
    }
}
}

【问题讨论】:

  • @TheodorZoulias 是的,我刚试过,它比我使用的 Parallel For 干净得多。谢谢!现在我得到一个 System.AggregateException 说我不能扩展流的长度。我假设这发生在 CopyToAsync() 中,但我不明白为什么这会超出范围,因为我事先计算过
  • 是的,我认为这对示例很有用或与损坏有关,但我将其删除并用普通的 HTTP 客户端替换它。
  • 附带说明,HttpClient 类旨在实例化 once,并在应用程序的整个生命周期中重复使用。

标签: c# asynchronous .net-core networking parallel-processing


【解决方案1】:

我发现损坏正在发生,因为某些范围会大于 Start-End。我解决了这个问题,但在内存映射文件和视图流中都添加了一个读写修饰符。然后我把视图流的大小改成sendAsync请求返回的内容的长度:

var streams = mmf.CreateViewStream(piece.Start, message.Content.Headers.ContentLength!.Value,
                        MemoryMappedFileAccess.ReadWrite);

【讨论】:

  • 与使用单个异步操作下载文件相比,您是否发现任何性能改进? (不平行)
  • @TheodorZoulias 是的,我有。对于较小的文件不是很明显,但较大的文件(1+ GB)与单个操作相比非常明显。
  • 有趣。我希望所有可用带宽都用于下载文件。你能分享一些指标吗?与一次操作相比,使用 2、3、4 次并行操作下载相同的 1+GB 文件要快多少?
  • @TheodorZoulias 我通过将“pieces”(进程)变量更改为 1、2、3、4,从 speed.hetzner.de 下载了 1GB 测试文件。 1 次操作我得到 7:39,2 次操作 6:34,3 次操作 5:44,4 次操作 5:36。
  • 我确信进程数有一个“最佳位置”。目前,只是尝试优化代码并确保它不会在任何地方阻塞。
猜你喜欢
  • 2015-07-14
  • 2015-07-16
  • 2017-12-04
  • 2019-01-28
  • 1970-01-01
  • 2017-09-13
  • 2018-03-20
  • 2019-08-19
  • 2023-03-05
相关资源
最近更新 更多