【问题标题】:RxJS zip not working while forkJoin doesRxJS zip 不工作,而 forkJoin 工作
【发布时间】:2017-12-07 15:24:53
【问题描述】:

我想要实现的是(使用 Angular 2/Typescript):

  • Observable A 产生事件流。

  • 对于 Observable A 的每个事件,进行 8 次不同的 http 调用。 (8 个切换图)

  • 8 个请求全部返回后,做点什么(订阅 8 个 switchmap 的 zip)。

  • 对 Observable A 的每个事件重复 8 次请求(由 switchmap 和 zip 处理)

代码:(完整代码https://plnkr.co/edit/44yqw0RYzC7v1TFACMx1

let source = Observable
.interval(5000)
.take(100);

let requests = [];

for(let i=0; i<8;i++) {
  let request = source.switchMap(x=> http.get('https://jsonplaceholder.typicode.com/users/'+(i+1))).publish();
  request.subscribe(res => console.log(res.json()));
  requests.push(request);
}

Observable.zip(requests)
.subscribe(console.log("All requests completed"));

requests.forEach(r => r.connect());

问题是我的 zip 永远不会被调用。我 console.log'ged 订阅了 8 个 switchmap 中的每一个,并且每次在 Observable/stream A 中发生事件时,我都会收到显示 8 个 http 调用成功返回的日志。(也可以在网络选项卡中看到 8 个调用返回调试工具)

但 zip 从不发射任何东西。


如果我尝试不同的(不太理想的)方法:

  • 订阅 Observable A 一次(不是 switchmap)
  • 在订阅中为每个 http 调用创建 8 个 Observable,并订阅 8 个 Observable 的 ForkJoin

代码:(完整代码https://plnkr.co/edit/GqQde1Ae2licBjtL0jcj

let source = Observable
.interval(5000)
.take(100);

 source.subscribe(x=> {
   console.log(x);
   let requests = [];

   for(let i=0; i<8;i++) {
     let request = http.get('https://jsonplaceholder.typicode.com/users/'+(i+1)).publish();
     request.subscribe(res => console.log(res.json()));
     requests.push(request);
   }

   Observable.forkJoin(requests)
   .subscribe(console.log("All requests completed"));

   requests.forEach(r => r.connect());

 });

这行得通。但是有一个明显的陷阱,即每次 Observable A 发出时我都会创建 8+1 个嵌套的 observables/subscriptions。

(在这两种情况下,我都使用发布/连接来共享/重用订阅,但即使没有它,问题仍然存在)

【问题讨论】:

  • 显示一些代码。
  • 添加了 @RobinDijkhof 。花了一些时间从我的应用程序中提取主要逻辑

标签: javascript angular rxjs reactive-programming


【解决方案1】:

如果您使用多个参数正确调用zip 并将函数传递给订阅(不是未定义的console.log 的结果),那么您的第一个示例将起作用。 Demo.

Observable.zip(...requests) // <-- spread this 
    .subscribe(() => console.log("All requests completed")); // <-- pass a function

requests.forEach(r => r.connect());

【讨论】:

  • 这行得通!谢谢。但我很好奇为什么 forkJoin、combineLatest 等在不传播争论的情况下工作。 (还有为什么 subscribe 直接与他们一起使用 console.log() ,而不传递 lambda/arrow 函数)?
  • @flak37 “为什么 forkJoin、combineLatest 等在不传播论点的情况下工作”即席多态性。当第一个 arg 是数组时,它们有特殊处理。 code
  • @flak37 “还有为什么 subscribe 直接与他们一起使用 console.log()” 它实际上并没有工作。简单证明:plnkr.co/edit/k0XW99xu8WLdMY3ecOl9?p=preview
  • 哦,谢谢!最后,您能否解释一下为什么在最后一次“所有请求已完成
  • @flak37 打印“'This line should come before ...”的原因也一样:所有这些代码都在onNext处理程序中运行。
猜你喜欢
  • 2020-10-02
  • 1970-01-01
  • 2020-08-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-18
  • 1970-01-01
相关资源
最近更新 更多