【问题标题】:A better way to pass data from one observable to another将数据从一个可观察对象传递到另一个对象的更好方法
【发布时间】:2016-10-04 12:11:26
【问题描述】:

我创建了两个函数来测试Observables,每个函数都返回一个Observable

foo() {
  return new Observable(observer => {
    let i = 0;
    setInterval( () => {
      if(i === 10) {
        observer.complete();
      } else {
        observer.next(i);
        i++;
      }
    }, 1000);
    // If I call observer.complete() here then it never completes
  });
}

bar(fooResult) {
  return new Observable(observer => {
    let j = 0;
    setInterval( () => {
      if(fooResult) {
        observer.next('j -> '+j+' plus fooResult '+JSON.stringify(fooResult));
        observer.complete();
      } else {
        observer.next(j);
        j++;
      }
    }, 2000);
  });
}

并像这样使用它们:

    let fooResult = [];

    // Testing observables...
    this.exampleProductService.foo().subscribe(
      (res) => { 
        console.log('foo next() -> '+res);
        fooResult.push(res);
      },
      (err) => {
        console.error('foo error: '+JSON.stringify(err));
      },
      () => { 
        console.log('foo finished');
        this.exampleProductService.bar(fooResult).subscribe(
          (res) => {
            console.log('bar next() -> '+res);
          },
           (err) => {
            console.error('bar error: '+JSON.stringify(err));
          },
          () => {
            console.log('bar finished');
          }
        );
      }
    );

提出的问题是:

  1. 有没有更好的方法将数据从 Observable 的完成传递到另一个也返回 Observable 的函数?建立一个数组似乎很麻烦,我不能做以下事情,因为Observablecomplete callback 部分没有传递像progressUpdateonError 这样的参数:

    (complete) => { this.exampleProductService.bar(complete).// rest of code } 
    
  2. 我尝试将第一个函数的结果分配给一个变量,然后传递该变量,但正如预期的那样,我得到了一个 Observable 而不是我想要的结果。

  3. 我上面的做法有什么不对吗?

谢谢

附:这是一个 Angular 2 应用程序!

【问题讨论】:

    标签: angular rxjs observable


    【解决方案1】:

    我认为您的函数有点过于复杂。一方面,当工厂函数已经可用时不要使用构造函数,在这种情况下intervaltimer 正如@Meir 指出的那样,尽管在这种情况下它会更冗长。

    其次,bar 函数实际上并没有多大意义,因为您似乎在等待完成您已经知道已经完成的事情(因为您直到完成块才订阅它由foo 生成的 Observable)。

    我根据您声明的目标进行了重构,即等待一个 Observable 完成,然后再开始第二个,同时在第二个中使用第一个的结果。

    // Factory function to emit 10 items, 1 every second
    function foo() {
      return Observable.interval(1000)
        .take(10);
    }
    
    // Lifts the passed in value into an Observable and stringfys it
    function bar(fooResult) {
      return Rx.Observable.of(fooResult)
        .map(res => JSON.stringify(fooResult))
    }
    

    现在当使用它们时,你会这样做:

    foo()
      // Log the side effects of foo
      .do(
        x => console.log(`foo next() -> ${x}`),
       err => console.error(`foo error: ${JSON.stringify(err)}`),
       () => console.log('foo finished')
      )
      // Collect the results from foo and only emit when foo completes
      .reduce((total, diff) => [...total, diff], [])
    
      // Pass the result from the reduce on to bar
      .concatMap(fooResult => bar(fooResult))
    
      //Subscribe to the results of bar
      .subscribe(
        res => console.log(`bar next() -> ${res}`),
        err => console.error(`bar error: ${JSON.stringify(err)}`),
        ()  => console.log('bar finished')
      );
    

    注意上面我也摆脱了全局状态,这是函数式编程的诅咒。您的状态应尽可能本地化到流中。

    在此处查看工作示例:http://jsbin.com/pexayohoho/1/edit?js,console

    【讨论】:

    • 感谢保罗的反馈。在这种情况下,您具体如何摆脱全局状态?
    • 另外——你能解释一下这行吗? .reduce((total, diff) => [...total, diff], []) 从文档来看,它充当 Observable 上的累加器,并在 i 完成时返回一个值。我知道,但是提供的可选种子(差异​​)是什么以及 [...total, diff] 是什么意思 - 我以前从未见过这个
    • re:全局状态,不再有名为fooResult的全局数组。回复:语法,[...total, diff]total.concat([diff]) 的 ES6 简写,基本上它是创建一个新的数组副本加上新项以避免改变现有项。 diff 是传入的新值,而不是可选种子,它以空数组 [] 的形式提供。这能回答你的问题吗?
    • 是的 paulpdaniels 确实如此。感谢您的澄清
    【解决方案2】:

    我不确定您想要实现什么,但这里有一个 jsbin that 尝试复制您的代码。

    注意事项:

    您的 foo() 使用 Observable.timer 和 .take() 运算符更容易创建。

    你的 bar() 可以被另一个带有地图和 .takeWhile() 运算符的计时器使用。

    至于最后一次订阅(完整部分),它只会打印 'foo finished' 而不会打印其他内容,因为它订阅了一个已经终止的非缓冲序列。

    【讨论】:

    • 感谢 Meir 的反馈——我试图实现的是将输出从一个 observable 传递到另一个的方法。我喜欢您示例中 foo 部分的简洁性
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-07-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-12
    • 1970-01-01
    相关资源
    最近更新 更多