【问题标题】:Observable which reads a stream until it ends or there is an errorObservable 读取流直到它结束或出现错误
【发布时间】:2013-01-19 22:03:36
【问题描述】:

使用响应式扩展,我如何创建一个 Observable,它将持续调用流上的 Read 方法并将结果传播给它的观察者?

或者这是完全错误的处理方式?我应该实现自己的 IObservable 吗?

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    我从来没有遇到过实现我自己的 observable 有意义的情况。

    试试这个:

    public static IObservable<byte[]> ObservableRead(Stream stream, int bufferSize)
    {
        return Observable.Create<byte[]>(o =>
        {
            var buffer = new byte[bufferSize];
            var read = 0;
            try
            {
                while (true)
                {
                    read = stream.Read(buffer, 0, buffer.Length);
                    if (read == 0)
                    {
                        break;
                    }
                    var results = buffer.Take(read).ToArray();
                    //Always return a copy
                    //never the buffer for concurrency's sake.
                    o.OnNext(results);
                }
            }
            catch (Exception ex)
            {
                o.OnError(ex);
            }
            finally
            {
                o.OnCompleted();
            }
            return Disposable.Empty;
        });
    }
    

    【讨论】:

    • 这很好,但它只在有订阅时运行。如果我想拥有多个订阅者并让它一直运行怎么办?我相信我需要使用 Observer.Generate 代替。现在想弄清楚。
    • @NoPyGod - 您不希望多个订阅者订阅一个流。每个人都会竞争获取字节。您需要一个可以发布给多个订阅者的阅读流。或者您可以将文件读入内存并移动它。理想情况下,您的签名应该是IObservable&lt;byte[]&gt; ObservableRead(Func&lt;Stream&gt; streamFactory, int bufferSize),这样这种方法才能正常运行。
    • 你为什么在finally中调用OnCompleted()?如果你把它放在try 的末尾,会不会一样?
    • @svick - 是的,你是对的。我会声称它是为了可读性。 ;-)
    • @NoPyGod - 那么你应该使用Publish 方法。这就是它的用途。你永远不应该尝试实现你自己的IObservable。如果你试图让 observable 保持状态并管理多个观察者,那么你的工作就会变得非常困难。使用内置运算符。它们将有助于确保您的代码安全。
    猜你喜欢
    • 2019-01-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-10-19
    • 1970-01-01
    • 2021-02-27
    • 1970-01-01
    • 2019-04-23
    相关资源
    最近更新 更多