【问题标题】:How to convert a sequence of Promises into Rx.Observable with RxJS?如何使用 RxJS 将 Promises 序列转换为 Rx.Observable?
【发布时间】:2017-03-07 10:07:27
【问题描述】:

我正在尝试使用 RxJS 从一系列 Promise 中创建一个 Rx.Observable。与this question 的区别在于我有未知数量的 Promise,每个 Promise 都依赖于前一个的结果。

基本上我有一系列页面,与“下一页”链接相连。

我想要函数做的是:

  • 等待承诺
  • 提供结果(fireobserver.onNext())
  • 检查是否有下一页链接
  • 使用该链接创建下一个 Promise
  • 重复直到剩下页面

我尝试了以下方法:

private getPages<T>(firstPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {

    let observable = Rx.Observable.create<T>(async obs => {
        let page = await firstPromise;
        page.value.forEach(v => obs.onNext(v));

        while (page['@odata.nextLink']) {
            let nextPageUrl = <string>page['@odata.nextLink'];
            let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
            page = await nextPagePromise;
            page.value.forEach(v => obs.onNext(v));
        }

        obs.onCompleted();
    });

    return observable;
}

IODataCollectionResult 是 OData 结果,其中 '@odata.nextLink' 是下一页 url,.value 是值数组)

问题是我无法用 TypeScript 编译它,它给了我一个错误:

'(obs: Observer) => Promise' 类型的参数不能分配给'(observer: Observer) => void |功能 | IDisposable'。

这是有道理的,因为异步函数返回 Promise&lt;void&gt;,而不是 void

这是否意味着我不能在 Rx.Observable.create() 中使用 async/await?如何将一系列 Promise 链接到 Observable 中?

【问题讨论】:

    标签: javascript typescript async-await rxjs


    【解决方案1】:

    您可以将async function 包装在使其结果无效的内容中:

    function toVoid<A>(fn: A => Any): A => Void {
        return x => void fn(x)
    }
    

    (请原谅我对 TypeScript 的了解不足,但我希望你能猜到它应该做什么)

    这样,你应该可以打电话了

    let observable = Rx.Observable.create<T>(toVoid(async obs => {
        …
    }));
    

    但也许你不应该那样做。不要丢弃承诺,而是使用它来附加适当的错误处理程序:

    let observable = Rx.Observable.create<T>(obs => {
        (async () => {
            …
        }()).catch(err => {
            obs.onError(err);
        });
    });
    

    【讨论】:

      【解决方案2】:

      使用 .then() + 递归解决了问题,没有 async/await:

      private getPages<T>(initialPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {
          return Rx.Observable.create<T>(obs => {
              const getPage = (promise: PromiseLike<IODataCollectionResult<T>>) => {
                  promise.then(page => {
                      page.value.forEach(v => obs.onNext(v));
                      if (page['@odata.nextLink']) {
                          let nextPageUrl = <string>page['@odata.nextLink'];
                          let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
                          getPage(nextPagePromise);
                      }
                      else {
                          obs.onCompleted();
                      }
                  });
              }
              getPage(initialPromise);
          });
      }
      

      【讨论】:

        猜你喜欢
        • 2018-12-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-11-14
        • 2015-10-19
        • 1970-01-01
        • 1970-01-01
        • 2016-07-01
        相关资源
        最近更新 更多