【问题标题】:How to make sure that the data of multiple Async downloads are saved in the order they were started?如何确保多个异步下载的数据按启动顺序保存?
【发布时间】:2016-06-06 17:12:16
【问题描述】:

我正在编写一个基本的 Http Live Stream (HLS) 下载器,我正在以“#EXT-X-TARGETDURATION”指定的时间间隔重新下载 m3u8 媒体播放列表,然后将 *.ts 片段下载为它们变得可用。

这是 m3u8 媒体播放列表首次下载时的样子。

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:12
#EXT-X-MEDIA-SEQUENCE:1
#EXTINF:7.975,
http://website.com/segment_1.ts
#EXTINF:7.941,
http://website.com/segment_2.ts
#EXTINF:7.975,
http://website.com/segment_3.ts

我想使用 HttpClient async/await 同时下载这些 *.ts 段。这些段的大小不同,因此即使“segment_1.ts”的下载首先开始,它也可能在其他两个段之后完成。

这些片段都是一个大视频的一部分,因此下载片段的数据必须按照它们开始的顺序写入,而不是按照它们完成的顺序。

如果分段一个接一个地下载,我下面的代码可以正常工作,但同时下载多个分段时就不行了,因为有时它们不会按照开始的顺序完成。

我考虑过使用 Task.WhenAll,它可以保证正确的顺序,但我不想将下载的段不必要地保留在内存中,因为它们的大小可能只有几兆字节。如果“segment_1.ts”的下载确实首先完成,那么它应该立即写入磁盘,而不必等待其他段完成。将所有 *.ts 段写入单独的文件并在最后加入它们也不是一种选择,因为它需要双倍的磁盘空间,并且整个视频的大小可能是几 GB。

我不知道该怎么做,我想知道是否有人可以帮助我。我正在寻找一种不需要我手动创建线程或长时间阻塞 ThreadPool 线程的方法。

一些代码和异常处理已被删除,以便更容易看到发生了什么。

// Async BlockingCollection from the AsyncEx library
private AsyncCollection<byte[]> segmentDataQueue = new AsyncCollection<byte[]>();

public void Start()
{
    RunConsumer();
    RunProducer();
}

private async void RunProducer()
{
    while (!_isCancelled)
    {
        var response = await _client.GetAsync(_playlistBaseUri + _playlistFilename, _cts.Token).ConfigureAwait(false);
        var data = await response.Content.ReadAsStringAsync().ConfigureAwait(false);

        string[] lines = data.Split(new string[] { "\n" }, StringSplitOptions.RemoveEmptyEntries);
        if (!lines.Any() || lines[0] != "#EXTM3U")
            throw new Exception("Invalid m3u8 media playlist.");

        for (var i = 1; i < lines.Length; i++)
        {
           var line = lines[i];
           if (line.StartsWith("#EXT-X-TARGETDURATION"))
           {
               ParseTargetDuration(line);
           }
           else if (line.StartsWith("#EXT-X-MEDIA-SEQUENCE"))
           {
               ParseMediaSequence(line);
           }
           else if (!line.StartsWith("#"))
           {
               if (_isNewSegment)
               {

                   // Fire and forget
                   DownloadTsSegment(line);

               }
           }
        }

        // Wait until it's time to reload the m3u8 media playlist again
        await Task.Delay(_targetDuration * 1000, _cts.Token).ConfigureAwait(false);
    }
}

// async void. We never await this method, so we can download multiple segments at once
private async void DownloadTsSegment(string tsUrl)
{
    var response = await _client.GetAsync(tsUrl, _cts.Token).ConfigureAwait(false);
    var data = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);

    // Add the downloaded segment data to the AsyncCollection
    await segmentDataQueue.AddAsync(data, _cts.Token).ConfigureAwait(false);
}

private async void RunConsumer()
{
    using (FileStream fs = new FileStream(_filePath, FileMode.Create, FileAccess.Write, FileShare.Read))
    {
        while (!_isCancelled)
        {
            // Wait until new segment data is added to the AsyncCollection and write it to disk
            var data = await segmentDataQueue.TakeAsync(_cts.Token).ConfigureAwait(false);
            await fs.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
        }
    }
}

【问题讨论】:

  • 我认为除了您已经知道的方法之外,没有其他方法了。 1)将段保存到磁盘并在所有段完成后加入。 2) 使用Task.WhenAll 保证订单 3) 不使用Fire and Forget 保证订单,但对每个段下载使用await。 3 个选项中的每一个都有自己的优点,但也有缺点。这里没有灵丹妙药,您必须选择最适合您能够接受/符合要求的解决方案。

标签: c# async-await httpclient http-live-streaming m3u8


【解决方案1】:

我认为您在这里根本不需要生产者/消费者队列。但是,我确实认为您应该避免“一劳永逸”。

您可以同时启动它们,并在它们完成时处理它们。

首先,定义如何下载单个片段:

private async Task<byte[]> DownloadTsSegmentAsync(string tsUrl)
{
  var response = await _client.GetAsync(tsUrl, _cts.Token).ConfigureAwait(false);
  return await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
}

然后添加对播放列表的解析,这会产生一个列表片段下载(所有这些都已经在进行中):

private List<Task<byte[]>> DownloadTasks(string data)
{
  var result = new List<Task<byte[]>>();
  string[] lines = data.Split(new string[] { "\n" }, StringSplitOptions.RemoveEmptyEntries);
  if (!lines.Any() || lines[0] != "#EXTM3U")
    throw new Exception("Invalid m3u8 media playlist.");
  ...
           if (_isNewSegment)
           {
             result.Add(DownloadTsSegmentAsync(line));
           }
  ...
  return result;
}

通过写入文件一次(按顺序)使用此列表:

private async Task RunConsumerAsync(List<Task<byte[]>> downloads)
{
  using (FileStream fs = new FileStream(_filePath, FileMode.Create, FileAccess.Write, FileShare.Read))
  {
    for (var task in downloads)
    {
      var data = await task.ConfigureAwait(false);
      await fs.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
    }
  }
}

和制作人一起开始:

public async Task RunAsync()
{
  // TODO: consider CancellationToken instead of a boolean.
  while (!_isCancelled)
  {
    var response = await _client.GetAsync(_playlistBaseUri + _playlistFilename, _cts.Token).ConfigureAwait(false);
    var data = await response.Content.ReadAsStringAsync().ConfigureAwait(false);

    var tasks = DownloadTasks(data);
    await RunConsumerAsync(tasks);

    await Task.Delay(_targetDuration * 1000, _cts.Token).ConfigureAwait(false);
  }
}

请注意,此解决方案确实同时运行所有下载,这可能会导致内存压力。如果这是一个问题,我建议您重组以使用 TPL Dataflow,它内置了对节流的支持。

【讨论】:

  • memory pressure OP 表示他的文件大小可以为 GB。大到他甚至想使用临时文件。
  • 非常感谢,效果很好。唯一的小问题是 RunAsync 方法中的 Task.Delay 运行太晚。在我上面发布的 m3u8 播放列表示例中,目标持续时间是 12 秒。播放列表必须每 12 秒重新下载一次,无论片段下载是否完成。使用当前代码,它首先下载段,然后等待 12 秒。结果是重新下载的播放列表中缺少片段,因为重新下载太晚了。
  • @usr 我实际上是在尝试避免使用临时文件。我最终可能会得到数百甚至 1000 多个临时文件,每个文件大小在 0.5 到 3 MB 之间。直播结束后,所有这些临时文件都必须加入,需要双倍的磁盘空间。
  • @ChrisDonovan:在这种情况下,您可以选择不使用awaitRunConsumerAsync 返回的任务。但是,您需要认真考虑该方法中的异常处理(即,将其包装在 try/catch 中并执行某些操作)。
【解决方案2】:

为每个下载分配一个序列号。将结果放入Dictionary&lt;int, byte[]&gt;。每次下载完成时,它都会添加自己的结果。

然后检查是否有段要写入磁盘:

while (dict.ContainsKey(lowestWrittenSegmentNumber + 1)) {
 WriteSegment(dict[lowestWrittenSegmentNumber + 1]);
 lowestWrittenSegmentNumber++;
}

这样,所有段最终都按顺序和缓冲存储在磁盘上。


RunConsumer();
RunProducer();

确保使用async Task,以便您可以使用await Task.WhenAll(RunConsumer(), RunProducer()); 等待完成。但是您应该不再需要RunConsumer

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-11-22
    • 1970-01-01
    相关资源
    最近更新 更多