【问题标题】:Dumping Observable<byte[]> to a stream将 Observable<byte[]> 转储到流中
【发布时间】:2019-03-10 21:39:00
【问题描述】:

我目前有一个Observable&lt;byte[]&gt;,实际上是一个源文件的块序列,使用this 方法。

它将流“分块”为byte[] 的序列。

问题是,给定这个序列,我想将它写入目标流。换句话说,我必须将每个 byte[] 转储到文件流中,直到序列完成,而且,我应该等待直到序列完成。

到目前为止,我创建的这段代码有效,但恐怕它不是正确的方法。处理IObservable&lt;byte[]&gt; 的相关部分是Download 方法。

async Task Main()
{
    using (var httpClient = new HttpClient()) 
    {
        var downloader = new HttpDownloader(httpClient);
        var destinationPath = Path.Combine(Path.GetTempPath(), "test.zip");
        await downloader.Download("https://github.com/gus33000/MSM8994-8992-NT-ARM64-Drivers/archive/master.zip", destinationPath);
        Console.WriteLine("File downloaded to " + destinationPath);
    }   
}

public class HttpDownloader
{
    private readonly HttpClient client;

    public HttpDownloader(HttpClient client)
    {
        this.client = client;
    }

    public async Task Download(string url, string path, IDownloadProgress progressObserver = null, int timeout = 30)
    {
        using (var fileStream = File.OpenWrite(path))
        {
            await Download(url, fileStream, progressObserver, timeout);
        }
    }

    private async Task Download(string url, Stream destination, IDownloadProgress progressObserver = null,
        int timeout = 30)
    {
        long? totalBytes = 0;
        long bytesWritten = 0;

        await ObservableMixin.Using(() => client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead),
                s =>
                {
                    totalBytes = s.Content.Headers.ContentLength;
                    if (!totalBytes.HasValue)
                    {
                        progressObserver?.Percentage.OnNext(double.PositiveInfinity);
                    }
                    return ObservableMixin.Using(() => s.Content.ReadAsStreamAsync(),
                        contentStream => contentStream.ReadToEndObservable());
                })
            .Do(bytes =>
            {
                bytesWritten += bytes.Length;
                if (totalBytes.HasValue)
                {
                    progressObserver?.Percentage.OnNext((double)bytesWritten / totalBytes.Value);
                }

                progressObserver?.BytesDownloaded?.OnNext(bytesWritten);
            })
            .Timeout(TimeSpan.FromSeconds(timeout))
            .Select(bytes => Observable.FromAsync(async () =>
            {
                await destination.WriteAsync(bytes, 0, bytes.Length);
                return Unit.Default;
            }))
            .Merge(1);
    }

    private static readonly int BufferSize = 8192;

    public async Task<Stream> GetStream(string url, IDownloadProgress progress = null, int timeout = 30)
    {
        var tmpFile = Path.Combine(Path.GetTempPath(), Path.GetTempFileName());
        var stream = File.Create(tmpFile, BufferSize, FileOptions.DeleteOnClose);

        await Download(url, stream, progress, timeout);
        return stream;
    }
}

public interface IDownloadProgress
{
    ISubject<double> Percentage { get; set; }
    ISubject<long> BytesDownloaded { get; set; }
}

public static class ObservableMixin
{
    public static IObservable<TSource> Using<TSource, TResource>(
        Func<Task<TResource>> resourceFactoryAsync,
        Func<TResource, IObservable<TSource>> observableFactory)
        where TResource : IDisposable =>
        Observable.FromAsync(resourceFactoryAsync).SelectMany(
            resource => Observable.Using(() => resource, observableFactory));
}

public static class StreamExtensions
{
    internal const int defaultBufferSize = 4096;

    public static IObservable<byte[]> ReadToEndObservable(this Stream stream)
    {
        return stream.ReadToEndObservable(new byte[defaultBufferSize]);
    }

    public static IObservable<byte[]> ReadToEndObservable(this Stream stream, int bufferSize)
    {
        return stream.ReadToEndObservable(new byte[bufferSize]);
    }

    internal static IObservable<byte[]> ReadToEndObservable(this Stream stream, byte[] buffer)
    {
        return Observable.Create<byte[]>(
            observer =>
            {
                var subscription = new SerialDisposable();

                return new CompositeDisposable(
                    subscription,
                    Scheduler.Immediate.Schedule(
                        self =>
                        {
                            bool continueReading = true;

                            subscription.SetDisposableIndirectly(() =>
                                stream.ReadObservable(buffer).SubscribeSafe(
                                    data =>
                                    {
                                        if (data.Length > 0)
                                        {
                                            observer.OnNext(data);
                                        }
                                        else
                                        {
                                            continueReading = false;
                                        }
                                    },
                                    observer.OnError,
                                    () =>
                                    {
                                        if (continueReading)
                                        {
                                            self();
                                        }
                                        else
                                        {
                                            observer.OnCompleted();
                                        }
                                    }));
                        }));
            });
    }

    internal static IObservable<byte[]> ReadObservable(this Stream stream, byte[] buffer)
    {
        return stream.ReadObservable(buffer, 0, buffer.Length).Select(
            read =>
            {
                byte[] data;

                if (read <= 0)
                {
                    data = new byte[0];
                }
                else if (read == buffer.Length)
                {
                    data = (byte[])buffer.Clone();
                }
                else
                {
                    data = new byte[read];

                    Array.Copy(buffer, data, read);
                }

                return data;
            });
    }

    public static IObservable<int> ReadObservable(this Stream stream, byte[] buffer, int offset, int count)
    {
        return Observable.StartAsync(cancel => stream.ReadAsync(buffer, offset, count, cancel));
    }   
}

public static class SerialDisposableExtensions
{
    public static void SetDisposableIndirectly(this SerialDisposable disposable, Func<IDisposable> factory)
    {
        var indirection = new SingleAssignmentDisposable();

        disposable.Disposable = indirection;

        indirection.Disposable = factory();
    }
}


public static class SafeObservableExtensions
{
    public static IDisposable SubscribeSafe<T>(this IObservable<T> source, Action<T> onNext,
        Action<Exception> onError, Action onCompleted)
    {
        return source.SubscribeSafe(Observer.Create<T>(onNext, onError, onCompleted));
    }
}

看起来还可以吗?

【问题讨论】:

  • 您不能同时使用obs.Subscribe(bytes =&gt; stream.Write(bytes, 0, bytes.Length));await obs;,因为它们会导致对您的obs 进行两个单独的订阅,并且您现在可能存在竞争条件。您需要确保只有一个。
  • 你好@Enigmativity!感谢您查看代码。你能看看编辑过的问题吗?可以吗?再次提前感谢!
  • 你能提供一个minimal reproducible example吗?例如,我找不到ReadToEndObservable 的代码。
  • @Enigmativity 我已经添加了所有代码来运行Download 方法。为方便起见,我还将其添加为 GitHub 存储库:github.com/SuperJMN/HttpDownloader

标签: c# .net stream system.reactive


【解决方案1】:

我最初认为您的ReadToEndObservable 一定有错误,所以我写了这个:

public static IObservable<byte[]> ReadToEndObservable(this Stream stream, int bufferSize)
    =>
        Observable.Defer<byte[]>(() =>
        {
            var bytesRead = -1;
            var bytes = new byte[bufferSize];
            return
                Observable.While<byte[]>(
                    () => bytesRead != 0,
                    Observable
                        .FromAsync(() => stream.ReadAsync(bytes, 0, bufferSize))
                        .Do(x =>
                        {
                            bytesRead = x;
                        })
                        .Select(x => bytes.Take(x).ToArray()));
        });

它似乎仍然没有工作。

然后我用这个简单的代码试了一下:

IObservable<byte[]> test1 =
    Observable
        .Using(
            () => File.Open(@"{path}\HttpDownloader-master\HttpDownloader-master\HttpDownloader.sln", FileMode.Open),
            s => s.ReadToEndObservable(24));

这适用于我的代码。我用你的试了一下。它奏效了。

我认为您尝试下载的流可能有问题。没有这样的问题 - 只是文件大小为 555MB。

我认为您的代码很好,但是大小太大而且超时了。

【讨论】:

  • 那么,您认为我的代码可以吗?根本没有比赛条件?
  • @SuperJMN - 它似乎在我尝试的流上运行良好。我只是不喜欢你的代码。我认为我的方法更惯用。
  • 我比我更喜欢你的代码!完全地道。我之前误读了你的答案。对不起!我现在完全理解了。我将用它做一些测试,如果它工作正常(我 100% 肯定它会),我会坚持你的解决方案。我喜欢它的简洁!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-05-13
  • 2018-12-15
  • 2017-09-24
  • 2019-02-23
  • 2021-08-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多