【问题标题】:Difference between cold observable in RX and normal EnumerableRX 中的冷可观察对象与普通 Enumerable 之间的区别
【发布时间】:2011-11-05 02:46:08
【问题描述】:

我是 Rx 的新手。我可以看到使用 Hot Observables 的一些真正好处,但是最近有人问我一个问题,即冷可观察对象和等效可枚举对象之间有什么区别(请参阅下面的代码 sn-p)...

    var resRx = Observable.Range(1, 10);
    var resOb = Enumerable.Range(1, 10);

谁能非常简单地解释两者之间的区别以及我将从冷的可观察对象与可枚举对象中获得什么好处。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    两者之间有几个不同之处。

    谁控制“枚举”

    对于 observable(热或冷),是 observable 确定何时以及在哪个线程上返回值。另一方面,可枚举在您请求时获取每个值,并在请求枚举的线程上进行处理。

    代码流

    处理枚举通常在 for each 循环中完成(或偶尔获取枚举器并使用 while 循环)。您的代码通常会在继续之前处理所有值。 Observables 需要回调。阻止代码的进一步执行(比如阻止退出控制台应用程序)需要您额外的代码。 observables 有一些阻塞操作符,例如First,但它们是例外而不是正常使用的规则。

    以这个简单的示例程序为例。使用可枚举,在继续下一部分之前写入所有值。但是,不能保证永远写入 observable 中的值。在程序终止之前写入了多少值几乎是一种竞争条件。

    static void Main(string[] args)
    {
        var xs = Enumerable.Range(1, 10);
        foreach (var x in xs)
        {
            Console.WriteLine(x);
        }
        //at this point, all values have been written
    
        var ys = Observable.Range(1, 10);
        ys.Subscribe(y => Console.WriteLine(y));
        //at this point, no values have been written (in general)
    
        //either add a Console.ReadKey or some sort of wait handle that
        //is set in the OnCompleted of the observer to get values
    }
    

    异步进程

    就像您必须编写额外的代码来阻塞和等待可观察对象一样,编写使用异步进程的 IEnumerable 也需要一些额外的工作。这就是两者之间的区别真正发挥作用的地方。

    例如,在我当前正在处理的应用程序中,我需要搜索可能连接到串行端口的设备。 IObservable 非常适合这种情况,因为它允许我在找到每个设备时进行回调并通知应用程序,而无需阻塞以及操作完成时。这个 observable 符合 cold observable 的条件,因为除非有订阅者,否则它不会推送数据,并且每个订阅都会获得所有结果。 (与典型的冷可观察对象不同,我在订阅之前开始工作,但没有数据丢失,因为它被缓冲到重播主题中。)但是,由于异步,将其转换为 Enumerable 对我来说没有多大意义自然。

    【讨论】:

    • 感谢 Gideon... 伟大的洞察力。
    【解决方案2】:

    您习惯的几乎所有 Enumerables 都是“Cold”Enumerables?为什么?因为如果你在 Enumerable.Range 上使用 ForEach 两次,你会得到 2 倍的数字。

    如果 Enumerable.Range 是 Hot Enumerable,它只会为您提供一次列表,而第二个 ForEach 将是空的。

    对于 Rx,Cold Observable 意味着每次您调用 Subscribe(相当于 Rx 中的 ForEach)时,您都会收到一个新的内容列表。像 FromEvent 这样的热门 Observable 不会在您每次订阅时为您提供新的事件流,它只是与同一事件流的另一个连接。

    Rx 在这里有什么优势?能够将该范围转换为异步请求:

    IObservable<Image> fetchImageByIndex(int imageIndexOnSite);
    
    Observable.Range(0, 10)
        .SelectMany(x => fetchImageByIndex(x)) 
        .Subscribe(image => saveImageToDisk(image));
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-09-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多