【问题标题】:How would you return an Observable<T> from a function你将如何从函数返回 Observable<T>
【发布时间】:2019-06-08 23:41:09
【问题描述】:

我有这个功能:

  getData(): Observable<ServerResponse> {
    const data = {} as ServerResponse;

    data.client = this.getClientData().pipe(
      map(response =>
        response.map(x => {
          return x.data.client;
        }),
      ),
    );

    data.otherData = this.otherData().pipe(
      map(response =>
        response.map(x => {
          return this.groupByTimePeriod(x.data.other, 'day', 'week');
        }),
      ),
    );
  }

该函数需要返回 data 以及函数内部分配给该函数的所有属性。你会怎么做?

如果我只是返回 data 它不起作用,因为 this.getClientData()this.otherData() 尚未完成。

【问题讨论】:

    标签: angular typescript rxjs observable


    【解决方案1】:

    好吧,您在这里有多个问题/问题。我将从最简单的开始。你如何从一个函数/对象中得到一个 observable?答案是通过可观察的of

    return of(data);
    

    但是你避开了一个更大的问题,那就是:你如何推迟返回数据,直到子 observables 发出它们的值?您正在寻找forkJoin。通过文档:

    forkJoin 将等待所有传递的 Observables 完成,然后它会发出一个数组,其中包含来自相应 Observables 的最后一个值。因此,如果您将n Observables 传递给运算符,则结果数组将具有n 值,其中第一个值是第一个 Observable 发出的最后一个值,第二个值是第二个 Observable 发出的最后一个值,依此类推。这意味着forkJoin 不会发出超过一次,然后会完成。

    您还有其他几个问题。例如,您永远不会订阅this.getClientData()this.otherData()。 Observables 被延迟执行。你的 observable 中的代码在订阅它之前不会执行。来自the docs

    Observable.create(function subscribe(observer) {...}) 中的代码表示“可观察执行”,只有每个订阅的观察者才会发生的惰性计算

    您似乎也在使用pipe/map 来尝试在您的data 对象上设置属性。但是你永远不会设置data.clientdata.other,所以它们永远是空的。

    所以,综合起来,这就是您的代码可能的样子,模拟服务器延迟表明forkJoin 等待两个可观察对象的完成:

    import { Injectable } from '@angular/core';
    import { Observable, of, forkJoin } from 'rxjs';
    import { delay } from 'rxjs/operators';
    
    @Injectable({
        providedIn: 'root'
    })
    export class TestService {
        getData(): Observable<ServerResponse> {
            const allOperations = forkJoin(
                this.getClientData(),
                this.getOtherData()
            );
    
            const observable = Observable.create(function subscribe(observer) {
                // Wait until all operations have completed
                allOperations.subscribe(([clientData, otherData]) => {
                    const data = new ServerResponse;
                    // Update your ServerReponse with client and other data
                    data.otherdata = otherData.other;
                    data.client = clientData.client;
                    // Now that data is 100% populated, emit to anything subscribed to getData().
                    observer.next(data);
                    observer.complete();
                });
    
            });
            // We return the observable, with the code above to be executed only once it is subscribed to
            return observable;
        }
    
        getClientData() : Observable<any> {
            return of({ client: 'Client 1' });
        }
        getOtherData(): Observable<any> {
            // Fake server latency
            return of({ other: 'Other data that takes a while to return from server...' })
                .pipe(delay(2000));
        }
    }
    
    export class ServerResponse {
        client: string;
        otherdata: string;
    }
    

    如果你调用 getData() 并订阅 observable,你会看到 forkJoin 按预期工作,我们必须等待 2 秒才能完成子 observable 和我们的 observable 发射一个值:

    this.testService.getData().subscribe(data => {
      console.log(data);
    });
    

    看来您可能是 RxJS / 异步编程的新手。我建议你有机会时阅读the excellent introduction 给 RxJs。一开始可能会很棘手,但随着练习,这将成为第二天性。

    【讨论】:

    • 感谢您提供具有如此详细解释的解决方案。我会尝试一下,看看它是否有效。
    猜你喜欢
    • 2019-08-05
    • 1970-01-01
    • 1970-01-01
    • 2019-12-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-14
    • 2016-07-01
    相关资源
    最近更新 更多