【问题标题】:How to wait for all observable to complete and store the result in an object如何等待所有可观察对象完成并将结果存储在对象中
【发布时间】:2021-11-10 09:31:37
【问题描述】:

Levels对象是每一层土壤的不同测量值的集合。每一层都有一组键值对(K,N,P),其中值是传感器类型的id。

我需要通过调用以 sensorType id 作为参数的 DrillData 方法为所有四个级别调用每个单元的 api,并等待所有可观察对象完成并存储结果(SensorData 中的总和)以完全相同的格式将传感器类型 ID 替换为总数。我如何达到预期的效果?提前致谢。


   export interface SensorData {
       period?:string,
       total?:number
   }

  public drillData(params?: {}):Observable<SensorData[]> {
        return this._http
          .get(this.url + "drill_data/", {
            headers: FapAPIRequestOptions.withTokenHeaders,
            params,
          }).pipe(map((r:Response)=>r["This year"] as SensorData[] ));
   }



 public levelValues: SoilLevels = {
    level30: {},
    level15: {},
    level10: {},
    level5: {},
  };

const levels={
level5: {K: 86, N: 84, P: 85, ec: 53, ph: 55, humidity: 38, temperature: 37}
level10: {K: 86, N: 84, P: 85, ec: 53, ph: 55, humidity: 38, temperature: 35}
level15: {K: 86, N: 84, P: 85, ec: 53, ph: 55, humidity: 57, temperature: 37}
level30: {K: 86, N: 84, P: 85, ec: 53, ph: 55, humidity: 93, temperature: 92}
}

for (let level of Object.keys(levels)) {
        for (let sensorKey of Object.keys(levels[level])) {
          try {
            const sensorData = await this.sensorService
              .drillData({ ...this.params, sensor: levels[level][sensorKey] })
              .toPromise();
            let total = 0;
            for (let data of sensorData) {
              total += data.total;
            }
            this.levelValues[level][sensorKey] = total;
          } catch (error) {
            console.log(error);
          }
        }
      }

我在这里使用了 toPromise(),但是它等待上一个请求完成来触发下一个请求。是否有其他方法可以并行调用所有可观察对象,并在它们全部完成后将它们存储在 this.levelValues 中。

这是我从 Api 获得的数据样本

端点:drill_data/?from=2018-09-15T10:56:40.005Z&to=2021-09-15T10:56:40.005Z&agg_by=all&sensor=92

这是来自 api 的响应

"This year":[
{period: "2021-09-14T12:15:14Z", total: 10},
{period: "2021-09-14T12:15:38Z", total: 12}
]

【问题讨论】:

  • 调用drillData方法时sensorId是什么
  • sesnorId 是我提供的每个关卡中key的值。
  • 这个我没看懂the result(sum of the total in SensorData) in exact same format replacing the sensor type id with the total.
  • 我在问题中提供的对象包含传感器 id(86,84,....)。对于每个 id,我将使用钻取数据函数进行 api 调用。该 api 返回一个 SensorData 形状的对象数组。我必须从 SensorData 中获取值并将其放置在与级别对象具有完全相同形状的对象中,但这次我将放置从 api 获得的值而不是 id 跨度>
  • 请您分享从关卡到drillData 的示例输入,以及您将从它得到的响应以及您想要对其执行的操作。

标签: javascript angular typescript rxjs observable


【解决方案1】:

使用 forkJoin

forkJoin(
  yourRequest1(),
  yourRequest2(),
  yourRequest3()
).subscribe(totalResult => console.log('After completing all observables',totalResult);

在这种情况下,您可以按照以下代码获取每个请求的结果。

  • 总结果[0]
  • 总结果[1]
  • 总结果[2]
  • 【讨论】:

    • 如果levels 包含 30 个条目会怎样? (有一个键名为“level30”)另外,yourRequest1() 是什么?
    • yourRequest1() 表示您的可观察到的第 1 号。
    【解决方案2】:

    而不是使用这个

    const sensorData = await this.sensorService.drillData({ ...this.params,sensor: levels[level][sensorKey] }).toPromise();
    

    使用这个

    const sensorData =forkJoin({
        this.sensorService.drillData({ ...this.params, sensor: levels[level][sensorKey] })
    })
    .subscribe(result => {
        console.log(result);
    });
    

    参考这个: https://www.learnrxjs.io/learn-rxjs/operators/combination/forkjoin

    【讨论】:

    • 文档说我们需要在 forkJoin 中提供一个密钥。我有不同的关卡,我该如何组织它们?
    • 没有必要。如果您传递密钥,您将更容易获得您想要的任何可观察的结果(通过使用密钥)。如果您不使用该键,它将为您提供一个对象数组,其顺序与您传递 observable 的顺序相同。(我已经在 totalResult[0] 下面给了您一个示例)
    【解决方案3】:

    这可能有一个非常聪明的本地运算符,但我不知道。 但至少,您可以像这样按顺序执行:

    import { lastValueFrom } from 'rxjs';
    
    let output = {}
    
    for( let key of Object.keys(levels)){
        output[key] = await lastValueFrom( drillData(levels[key]) )
    }
    
    console.log(output);
    

    编辑

    我想我提到的“原生和聪明”的方法是combineLatest。你可以尝试这样的事情:

    const observables:Observable<SensorData[]>[] = Object.values(levels).map(drillData.bind(this));
    
    let result:any = {};
    
    combineLatest(observables)
        .subscribe( drilled => {
            Object.keys(levels).forEach( (key, index) => result[key] = drilled[index] )
        })
    

    【讨论】:

    • lastValueFrom 仅在 rxjs 库的版本 7 中可用。我可以在版本 6 中使用 toPromise
    • 使用 promise 确实需要很长时间才能完成所有 api 调用。我需要这个 observable 并行运行并在所有 observable 完成时返回结果。到底有没有?
    • Promise 不会比 Observables 花费更长的时间。您可以使用Promise.all 并行运行它们而不是顺序运行它们
    • 如果可能的话,请您在回答中证明这一点。那会很有帮助
    • 是的,答案已更新,但这绝对未经测试,只是猜测或提示
    【解决方案4】:

    让我们从为每个嵌套键创建一个请求开始。我们将为levels 对象中的每个对象创建一个请求数组。

    from = new Date();
    to = new Date();
    finalResult$: Observable<any>; // to hold all final response
    
    const requests = Object.keys(this.levels).map(key => this.toRequest(key));
    
    toRequest(key: string): any[] {
        return Object.keys(this.levels[key]).map(K => ({
          sensorValue: this.levels[key][K],
          level: key,
          sensorKey: K,
          to: this.to,
          from: this.from
        }));
      }
    

    由于我们有一系列请求,我们将使用 switchMap 对它们进行平面映射 然后我们将使用mergeMap 进行并行 API 请求。 当我们得到响应时,我们将使用reduce 计算总值并根据它们的级别将它们放入结果中。

    请注意,reduce 只会在源 Observable 完成时发出一个最终值。

    const levels$ = from(requests);
    
    this.finalResult$ = levels$
      .pipe(
        switchMap(requestArr => requestArr),
        mergeMap(request => this.drillData(request)),
        reduce((acc, item) => this.responseReducer(acc, item), {}),
        catchError(x => of({}))
      )
      .subscribe(response => {
        console.log(response);
      });
    
    // method to reduce responses into final required level object
    responseReducer(acc, { level, sensorKey, sensorData }): any {
        const sum = this.getTotal(sensorData);
        const sensor = { [sensorKey]: sum };
    
        // if example `level15` key is already exists in accumulator
        //  we will  copy previous level data combined with current sensor data
        if (acc[level]) {
          acc = { ...acc, [level]: { ...acc[level], ...sensor } };
        } else {
          // if not we will just add into accumulator
          acc = { ...acc, [level]: sensor };
        }
        return acc;
    }
    
    public getTotal(sensorData: SensorData[]): number {
        return sensorData.reduce((acc, { total }) => (acc += total), 0);
    }
    

    在发出 API 请求时,我们将维护一个对象,该对象将跟踪已经发出的请求及其响应,以便如果已经发出请求,我们将从缓存中返回响应。

    const cachedResponse = this.getCache(uniqueKey);
    const responseMetaData = { level, sensorKey, sensorValue };
    if (cachedResponse) {
      return of({ sensorData: cachedResponse, ...responseMetaData });
    }
    

    打印响应

    <pre>
      {{ this.finalResult$ | async | json }}
    </pre>
    

    Demo

    在演示中,您将在drillData 方法中的params 中获得以下有效负载。只需使用适当的参数进行 API 调用即可。

    {
        from: "2021-09-15T16:01:54.297Z",
        level: "level5",
        sensorKey: "K",
        sensorValue: 86,
        to: "2021-09-15T16:01:54.297Z",
    }
    

    以下是我在drillData 中进行的虚拟API 调用,您只需将of 替换为您的实际API 调用

    return of(this.tempResponse[sensorValue]).pipe(
          tap(response => this.setCache(uniqueKey, response)),
          map(sensorData => ({ sensorData, ...responseMetaData }))
        );
    

    如果有任何遗漏,请告诉我。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-08-19
      • 2019-03-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-01-09
      • 2019-04-23
      相关资源
      最近更新 更多