【发布时间】:2013-01-19 22:03:36
【问题描述】:
使用响应式扩展,我如何创建一个 Observable,它将持续调用流上的 Read 方法并将结果传播给它的观察者?
或者这是完全错误的处理方式?我应该实现自己的 IObservable 吗?
【问题讨论】:
标签: c# system.reactive
使用响应式扩展,我如何创建一个 Observable,它将持续调用流上的 Read 方法并将结果传播给它的观察者?
或者这是完全错误的处理方式?我应该实现自己的 IObservable 吗?
【问题讨论】:
标签: c# system.reactive
我从来没有遇到过实现我自己的 observable 有意义的情况。
试试这个:
public static IObservable<byte[]> ObservableRead(Stream stream, int bufferSize)
{
return Observable.Create<byte[]>(o =>
{
var buffer = new byte[bufferSize];
var read = 0;
try
{
while (true)
{
read = stream.Read(buffer, 0, buffer.Length);
if (read == 0)
{
break;
}
var results = buffer.Take(read).ToArray();
//Always return a copy
//never the buffer for concurrency's sake.
o.OnNext(results);
}
}
catch (Exception ex)
{
o.OnError(ex);
}
finally
{
o.OnCompleted();
}
return Disposable.Empty;
});
}
【讨论】:
IObservable<byte[]> ObservableRead(Func<Stream> streamFactory, int bufferSize),这样这种方法才能正常运行。
finally中调用OnCompleted()?如果你把它放在try 的末尾,会不会一样?
Publish 方法。这就是它的用途。你永远不应该尝试实现你自己的IObservable。如果你试图让 observable 保持状态并管理多个观察者,那么你的工作就会变得非常困难。使用内置运算符。它们将有助于确保您的代码安全。