【发布时间】:2017-12-07 15:24:53
【问题描述】:
我想要实现的是(使用 Angular 2/Typescript):
Observable A 产生事件流。
对于 Observable A 的每个事件,进行 8 次不同的 http 调用。 (8 个切换图)
8 个请求全部返回后,做点什么(订阅 8 个 switchmap 的 zip)。
对 Observable A 的每个事件重复 8 次请求(由 switchmap 和 zip 处理)
代码:(完整代码https://plnkr.co/edit/44yqw0RYzC7v1TFACMx1)
let source = Observable
.interval(5000)
.take(100);
let requests = [];
for(let i=0; i<8;i++) {
let request = source.switchMap(x=> http.get('https://jsonplaceholder.typicode.com/users/'+(i+1))).publish();
request.subscribe(res => console.log(res.json()));
requests.push(request);
}
Observable.zip(requests)
.subscribe(console.log("All requests completed"));
requests.forEach(r => r.connect());
问题是我的 zip 永远不会被调用。我 console.log'ged 订阅了 8 个 switchmap 中的每一个,并且每次在 Observable/stream A 中发生事件时,我都会收到显示 8 个 http 调用成功返回的日志。(也可以在网络选项卡中看到 8 个调用返回调试工具)
但 zip 从不发射任何东西。
如果我尝试不同的(不太理想的)方法:
- 订阅 Observable A 一次(不是 switchmap)
- 在订阅中为每个 http 调用创建 8 个 Observable,并订阅 8 个 Observable 的 ForkJoin
代码:(完整代码https://plnkr.co/edit/GqQde1Ae2licBjtL0jcj)
let source = Observable
.interval(5000)
.take(100);
source.subscribe(x=> {
console.log(x);
let requests = [];
for(let i=0; i<8;i++) {
let request = http.get('https://jsonplaceholder.typicode.com/users/'+(i+1)).publish();
request.subscribe(res => console.log(res.json()));
requests.push(request);
}
Observable.forkJoin(requests)
.subscribe(console.log("All requests completed"));
requests.forEach(r => r.connect());
});
这行得通。但是有一个明显的陷阱,即每次 Observable A 发出时我都会创建 8+1 个嵌套的 observables/subscriptions。
(在这两种情况下,我都使用发布/连接来共享/重用订阅,但即使没有它,问题仍然存在)
【问题讨论】:
-
显示一些代码。
-
添加了 @RobinDijkhof 。花了一些时间从我的应用程序中提取主要逻辑
标签: javascript angular rxjs reactive-programming