【发布时间】:2019-03-10 21:39:00
【问题描述】:
我目前有一个Observable<byte[]>,实际上是一个源文件的块序列,使用this 方法。
它将流“分块”为byte[] 的序列。
问题是,给定这个序列,我想将它写入目标流。换句话说,我必须将每个 byte[] 转储到文件流中,直到序列完成,而且,我应该等待直到序列完成。
到目前为止,我创建的这段代码有效,但恐怕它不是正确的方法。处理IObservable<byte[]> 的相关部分是Download 方法。
async Task Main()
{
using (var httpClient = new HttpClient())
{
var downloader = new HttpDownloader(httpClient);
var destinationPath = Path.Combine(Path.GetTempPath(), "test.zip");
await downloader.Download("https://github.com/gus33000/MSM8994-8992-NT-ARM64-Drivers/archive/master.zip", destinationPath);
Console.WriteLine("File downloaded to " + destinationPath);
}
}
public class HttpDownloader
{
private readonly HttpClient client;
public HttpDownloader(HttpClient client)
{
this.client = client;
}
public async Task Download(string url, string path, IDownloadProgress progressObserver = null, int timeout = 30)
{
using (var fileStream = File.OpenWrite(path))
{
await Download(url, fileStream, progressObserver, timeout);
}
}
private async Task Download(string url, Stream destination, IDownloadProgress progressObserver = null,
int timeout = 30)
{
long? totalBytes = 0;
long bytesWritten = 0;
await ObservableMixin.Using(() => client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead),
s =>
{
totalBytes = s.Content.Headers.ContentLength;
if (!totalBytes.HasValue)
{
progressObserver?.Percentage.OnNext(double.PositiveInfinity);
}
return ObservableMixin.Using(() => s.Content.ReadAsStreamAsync(),
contentStream => contentStream.ReadToEndObservable());
})
.Do(bytes =>
{
bytesWritten += bytes.Length;
if (totalBytes.HasValue)
{
progressObserver?.Percentage.OnNext((double)bytesWritten / totalBytes.Value);
}
progressObserver?.BytesDownloaded?.OnNext(bytesWritten);
})
.Timeout(TimeSpan.FromSeconds(timeout))
.Select(bytes => Observable.FromAsync(async () =>
{
await destination.WriteAsync(bytes, 0, bytes.Length);
return Unit.Default;
}))
.Merge(1);
}
private static readonly int BufferSize = 8192;
public async Task<Stream> GetStream(string url, IDownloadProgress progress = null, int timeout = 30)
{
var tmpFile = Path.Combine(Path.GetTempPath(), Path.GetTempFileName());
var stream = File.Create(tmpFile, BufferSize, FileOptions.DeleteOnClose);
await Download(url, stream, progress, timeout);
return stream;
}
}
public interface IDownloadProgress
{
ISubject<double> Percentage { get; set; }
ISubject<long> BytesDownloaded { get; set; }
}
public static class ObservableMixin
{
public static IObservable<TSource> Using<TSource, TResource>(
Func<Task<TResource>> resourceFactoryAsync,
Func<TResource, IObservable<TSource>> observableFactory)
where TResource : IDisposable =>
Observable.FromAsync(resourceFactoryAsync).SelectMany(
resource => Observable.Using(() => resource, observableFactory));
}
public static class StreamExtensions
{
internal const int defaultBufferSize = 4096;
public static IObservable<byte[]> ReadToEndObservable(this Stream stream)
{
return stream.ReadToEndObservable(new byte[defaultBufferSize]);
}
public static IObservable<byte[]> ReadToEndObservable(this Stream stream, int bufferSize)
{
return stream.ReadToEndObservable(new byte[bufferSize]);
}
internal static IObservable<byte[]> ReadToEndObservable(this Stream stream, byte[] buffer)
{
return Observable.Create<byte[]>(
observer =>
{
var subscription = new SerialDisposable();
return new CompositeDisposable(
subscription,
Scheduler.Immediate.Schedule(
self =>
{
bool continueReading = true;
subscription.SetDisposableIndirectly(() =>
stream.ReadObservable(buffer).SubscribeSafe(
data =>
{
if (data.Length > 0)
{
observer.OnNext(data);
}
else
{
continueReading = false;
}
},
observer.OnError,
() =>
{
if (continueReading)
{
self();
}
else
{
observer.OnCompleted();
}
}));
}));
});
}
internal static IObservable<byte[]> ReadObservable(this Stream stream, byte[] buffer)
{
return stream.ReadObservable(buffer, 0, buffer.Length).Select(
read =>
{
byte[] data;
if (read <= 0)
{
data = new byte[0];
}
else if (read == buffer.Length)
{
data = (byte[])buffer.Clone();
}
else
{
data = new byte[read];
Array.Copy(buffer, data, read);
}
return data;
});
}
public static IObservable<int> ReadObservable(this Stream stream, byte[] buffer, int offset, int count)
{
return Observable.StartAsync(cancel => stream.ReadAsync(buffer, offset, count, cancel));
}
}
public static class SerialDisposableExtensions
{
public static void SetDisposableIndirectly(this SerialDisposable disposable, Func<IDisposable> factory)
{
var indirection = new SingleAssignmentDisposable();
disposable.Disposable = indirection;
indirection.Disposable = factory();
}
}
public static class SafeObservableExtensions
{
public static IDisposable SubscribeSafe<T>(this IObservable<T> source, Action<T> onNext,
Action<Exception> onError, Action onCompleted)
{
return source.SubscribeSafe(Observer.Create<T>(onNext, onError, onCompleted));
}
}
看起来还可以吗?
【问题讨论】:
-
您不能同时使用
obs.Subscribe(bytes => stream.Write(bytes, 0, bytes.Length));和await obs;,因为它们会导致对您的obs进行两个单独的订阅,并且您现在可能存在竞争条件。您需要确保只有一个。 -
你好@Enigmativity!感谢您查看代码。你能看看编辑过的问题吗?可以吗?再次提前感谢!
-
你能提供一个minimal reproducible example吗?例如,我找不到
ReadToEndObservable的代码。 -
@Enigmativity 我已经添加了所有代码来运行
Download方法。为方便起见,我还将其添加为 GitHub 存储库:github.com/SuperJMN/HttpDownloader
标签: c# .net stream system.reactive