【问题标题】:Rx: What are subscriptions and how do subscriptions work?Rx:什么是订阅以及订阅是如何工作的?
【发布时间】:2023-03-14 06:57:01
【问题描述】:

我正在学习 .NET 中的响应式扩展 (rx),我对“订阅”到底是什么以及何时使用它感到有些困惑。

让我们从this线程中获取一些样本数据:

using System;
using System.Reactive.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        class Result
        {
            public bool Flag { get; set; }
            public string Text { get; set; }
        }

        static void Main(string[] args)
        {               
            var source =
               Observable.Create<Result>(f =>
               {
                   Console.WriteLine("Start creating data!");

                   f.OnNext(new Result() { Text = "one", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "two", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "three", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "four", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "five", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "six", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "seven", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "eight", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "nine", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "ten", Flag = false });

                   return () => Console.WriteLine("Observer has unsubscribed");
               });
        }
    }
}

当心这条线:

 Console.WriteLine("Start creating data!");

现在,首先我认为订阅只是通过使用.Subscribe 运算符来使用。所以一个观察者(例如.Subscribe函数的回调)订阅一个observable(一系列操作符的最后一个返回值),就像这样(作为一个例子,查询没有真正的用途):

  source.Zip(source, (s1, s0) =>
     s0.Flag
     ? Observable.Return(s1)
     : Observable.Empty<Result>()).Merge().Subscribe(f => { Console.WriteLine(f.Text); });

现在我期待得到“开始创建数据!”只输出一次,因为我只使用一个订阅。但实际上,我得到了两次

Start creating data!
Start creating data!
two
five
six
seven
nine

有人告诉我,每次我在 source. 上使用运算符时,都会进行订阅。但在这个例子中,我只使用了一次source.,然后第二次作为.Zip 运算符的参数。还是因为源被再次订阅的值传递给.Zip函数?

所以我的问题是:

  1. 就 Rx 而言,“订阅”究竟是什么?
  2. 在我的示例中,这两个订阅究竟发生在哪里/为什么发生?

顺便说一句。我知道我可以通过使用 .Publish 运算符来防止发生多个订阅,但这不是我的问题范围。

【问题讨论】:

  • 您需要像我在上一个问题中所做的那样创建您的测试可观察对象 - 您的这种 Observable.Create&lt;Result&gt; 方法迟早会让您感到悲伤。
  • 谢谢,我这样做是因为我提到了我的其他问题。 :) 实际上,我现在将使用您的 .generate 解决方案进行测试。

标签: c# .net rxjs reactive-programming system.reactive


【解决方案1】:

简而言之,订阅仅代表已订阅的Observable。此过程可以通过使用.Subscribe 显式发生,也可以通过连接两个或多个Observables 然后订阅生成的链来隐式发生。

在您的情况下,您会看到两种情况都发生,一次是在您调用Subscribe 时显式发生,一次是在您将source 传递给Zip 时隐式发生,也就是说,有两个Subscriptionssource @987654329 @。

为什么这很重要?因为默认情况下Observables 是惰性的,这意味着它们在您订阅它们之前不会开始处理(该过程的产物是Subscription),因此扩展这意味着 任何 您订阅的时间到Observable 它将有效地开始一个新的流。这种行为可以像你提到的Publish 那样被覆盖,但默认情况下每个Observable 都是

在您的特定情况下,由于您将相同的Observable 传递给Zip,它需要订阅它两次,因为它会将来自两个传递的流的事件压缩在一起。结果是对同一个Observable 的两个订阅,每个订阅彼此独立运行。

【讨论】:

  • "这个行为可以像你提到的 Publish 一样被覆盖,但默认情况下每个 Observable 都是冷的。" - 这并不完全正确。 observable 是热的还是冷的都没有关系——每个订阅都会创建一个新的 observable 管道(即订阅)。 observable 没有“默认值”——它们可以是热的也可以是冷的。重要的是如何创建 observable。调用 .Publish 不会使 observable 变热——它只是允许已发布 observable 的调用者共享一个底层订阅。
  • 你的最后一段是正确的——尤其是“每个独立运行”这一点。如果可观察到的source.Zip(source, (s1, s0) =&gt; 被写成source.Publish(ss =&gt; ss.Zip(ss, (s1, s0) =&gt;,那么两个ss 订阅仍将彼此独立运行,但只有一个source 订阅。这就是.Publish 所做的。
  • 那么实际上,这里甚至有三个使用.Publish的订阅:一个到源,两个到“zip”?
  • @TobiasvonFalkenhayn - 不,还有更多。如果您接受我对上一个问题的回答,source.StartWith(new Result() { Flag = true }).Publish(ss =&gt; ss.Skip(1).Zip(ss, (s1, s0) =&gt; s0.Flag ? Observable.Return(s1) : Observable.Empty&lt;Result&gt;()).Merge());,则有 18 个订阅。 1 source,2 StartWith(new Result() { Flag = true }),3 Publish,4 ss,5 .Skip(1),6 .Zip,7 ss(拉链内),8 .Merge,然后是 10 Observable.Return(s1) : Observable.Empty&lt;Result&gt;() 对。
  • @TobiasvonFalkenhayn - 举个简单的例子:source.Where(x =&gt; x &gt; 10).Select(x =&gt; x * 2).Subscribe().Subscribe()订阅.Select.Select订阅.Where.Where订阅source
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-09
  • 2016-12-19
相关资源
最近更新 更多