【问题标题】:Rxjs - Subscribing to interdependent observablesRxjs - 订阅相互依赖的 observables
【发布时间】:2016-12-25 02:01:28
【问题描述】:

我正在学习 Angular 2 和 rxjs。我有 3 个变量,A B C。

  • B 取决于 A 的值
  • C 取决于 A 和 B 的值

我正在尝试设置 observables:当 A 更新时,B 和 C 将自动更新。当 B 更新时,C 将自动更新。我尝试了两种设置,但都不令人满意。

  • 第一次设置:B 订阅 observable A; C 订阅了 observable B withlatestfrom A。A 中的更改确实级联到 B,然后到 C,但来自 A 的值不是最新的。
  • 第二次设置:B 订阅 observable A; C 订阅了 A 和 B 的 combineLatest observable。此设置有效,但我得到了 C 的两个更新,首先来自 B,然后来自 A。

如何设置我的 observables / 订阅,以便在更新 A 时,C 只会使用 A 和 B 的最新值更新一次?

编辑 - 添加代码

var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();

A_Observable.subscribe(function(A_value) {
  var newB = A_value * 10;
  // console.log("B auto updating to " + newB);
  B.next(newB);
});

// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
  console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});

// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable);
combineAllObservable.subscribe(function(data) {
  console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  A.next(2);
},1000);

// SATISFACTORY RESULT
setTimeout(function(){
  console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

【问题讨论】:

  • 如果您包含代码而不是代码描述,您的问题会更清楚。

标签: angular rxjs reactive-programming rxjs5


【解决方案1】:

代码是这样运行的,因为默认情况下 RxJs 会同步运行代码。 withLatestFrom 案例的事件链:

  1. 你推送新的A值:A.next(2);
  2. RxjS看到A有两种用法:
    1. B-subscription(先到先得)
    2. withLatestFrom(第二名)
  3. RxJs 调用B-subscription
    1. 推送B的新值
    2. RxjS看到有B的用法:C-subscription
    3. RxJs 调用C-subscription(旧值为A!)
  4. A 的新值被拉入withLatestFrom,但C-订阅未触发(因为它仅由B 更新触发)

可能的解决方案是添加debounceTime(0)。当withLatestFrom 中的值已经更新时,它将强制C-subscription 异步运行。

RxJs 的Scheduler 的更多信息谷歌。

var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();

A_Observable.subscribe(function(A_value) {
  var newB = A_value * 10;
  // console.log("B auto updating to " + newB);
  B.next(newB);
});

// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.debounceTime(0).withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
  console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});

// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable).debounceTime(0);
combineAllObservable.subscribe(function(data) {
  console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  A.next(2);
},1000);

// SATISFACTORY RESULT
setTimeout(function(){
  console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

我想补充两点:

  1. withLatestFrom 方法在您的情况下更好,因为依赖关系图看起来像 C -> B -> A 并且当 A 更新时 B 也会更新。如果AB 是独立的,那么combineLatest 将是更好的选择。
  2. Subject 不需要 B。你可以像这样重写代码(没有debounceTime!)

var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();

var B_Observable = A_Observable.map(function(A_value) {
  var newB = A_value * 10;
  // console.log("B auto updating to " + newB);
  return newB;
});

var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
  console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  A.next(2);
},1000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

【讨论】:

  • 感谢您的详细解释。你的解决方案对我有用。
【解决方案2】:

好的,所以我用你的代码进行了测试。调整以改变一些事情。您可以在此处查看代码:https://jsbin.com/zonedirogi/edit?html,js,console,output。主要的变化实际上是用 A 的最新代替 B,而是用 B 的最新改为 A。然后它按预期工作。

仅供参考,您无需使用asObservable() 将主题转换为可观察对象。

var a = new Rx.BehaviorSubject(1);
var b = new Rx.BehaviorSubject(10);

a.subscribe(x => {
  b.next(x * 10);
});

a.withLatestFrom(b).subscribe(x => {
  console.log(x)
})

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  a.next(2);
}, 1000);

【讨论】:

  • 我简化了问题。在实际情况下,A 和 B 属于不同的类,我不想将变量公开为主题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-08-10
  • 2017-03-26
  • 2020-10-28
  • 2020-09-05
  • 2017-08-24
  • 1970-01-01
  • 2017-12-17
相关资源
最近更新 更多