【问题标题】:How to combine multiple rxjs BehaviourSubjects如何组合多个 rxjs BehaviourSubjects
【发布时间】:2017-03-16 17:25:07
【问题描述】:

我正在构建一个 Angular2 应用程序,并且有两个 BehaviourSubjects 我想在逻辑上合并为一个订阅。我正在发出两个 http 请求,并希望在它们都回来时触发一个事件。我正在查看 forkJoincombineLatest。似乎 combineLatest 会在任何一个 behvaviorSubjects 更新时触发,而 forkJoin 只会在所有 behavoirSubjects 都更新后触发。它是否正确?必须有一个普遍接受的模式,不是吗?

编辑
这是我的 angular2 组件订阅的其中一个行为主题的示例:

export class CpmService {

    public cpmSubject: BehaviorSubject<Cpm[]>;

    constructor(private _http: Http) {
        this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>());
    }

    getCpm(id: number): void {
        let params: URLSearchParams = new URLSearchParams();
        params.set('Id', id.toString());

        this._http.get('a/Url/Here', { search: params })
            .map(response => <Cpm>response.json())
            .subscribe(_cpm => {
                this.cpmSubject.subscribe(cpmList => {
                    //double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres
                    if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
                        cpmList.push(_cpm);
                        this.cpmSubject.next(cpmList);
                    }
                })
            });
    }
}

这是我的组件订阅的 sn-p:

  this._cpmService.cpmSubject.subscribe(cpmList => {
      doSomeWork();
  });

但我不想在单个订阅上触发 doSomeWork(),我只想在 cpmSubject 和 fooSubject 触发时触发 doSomeWork()。

【问题讨论】:

  • http-request 不能直接返回 BehaviorSubject - 我假设你是 nexting http-responses 每个都变成 BehaviorSubject 或者甚至可能订阅 Subjectget/post/put?
  • @olsn 是的,我正在订阅 http 响应,并在我的主题下一个他们在服务类中的响应
  • 行为主体的公共访问是一种反模式。请使用带有“as Observable”的向下转换的吸气剂。所以不能在服务外使用下一个调用->关注点分离

标签: rxjs behaviorsubject


【解决方案1】:

您可以使用zip-operator,其工作方式类似于 combineLatest 或 forkJoin,但仅在两个流都发出时触发:http://reactivex.io/documentation/operators/zip.html

zipcombineLatest 的区别是: Zip 只会触发“并行”,而combineLatest 将触发任何更新并发出每个流的最新值。 因此,假设以下 2 个流:

streamA => 1--2--3
streamB => 10-20-30

zip:

  • “1, 10”
  • “2, 20”
  • “3, 30”

combineLatest:

  • “1, 10”
  • “2, 10”
  • “2, 20”
  • “3, 20”
  • “3, 30”

这也是一个活生生的例子:

const a = new Rx.Subject();
const b = new Rx.Subject();

Rx.Observable.zip(a,b)
  .subscribe(x => console.log("zip: " + x.join(", ")));
Rx.Observable.combineLatest(a,b)
  .subscribe(x => console.log("combineLatest: " + x.join(", ")));

a.next(1);
b.next(10);
a.next(2);
b.next(20);
a.next(3);
b.next(30);
&lt;script src="https://unpkg.com/rxjs/bundles/Rx.min.js"&gt;&lt;/script&gt;

还有一个旁注:永远不要在订阅内订阅。 改为这样做:

this._http.get('a/Url/Here', { search: params })
            .map(response => <Cpm>response.json())
            .withLatestFrom(this.cpmSubject)
            .subscribe([_cpm, cpmList] => {
                if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
                    cpmList.push(_cpm);
                    this.cpmSubject.next(cpmList);
                }
            });

【讨论】:

  • zip 和 combineLatest 有什么区别?
  • 感谢您的详细解答。我正在尝试尝试,但在我的 Rx.Observable 对象上找不到 zip 方法。
  • 如果您还没有导入整个库,请确保通过import "rxjs/add/observable/zip"添加它; - 它确实存在:github.com/ReactiveX/rxjs/blob/…
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-08-15
  • 1970-01-01
  • 1970-01-01
  • 2023-04-03
  • 1970-01-01
  • 2018-04-07
相关资源
最近更新 更多