【问题标题】:How to build an observable which calls multiple observables based on previous result?如何构建一个基于先前结果调用多个可观察对象的可观察对象?
【发布时间】:2020-07-22 15:02:44
【问题描述】:

我有一个端点返回{ ids: [1, 2, 3, 45] }

另一个返回给定 id 的值:{ id: 3, value: 30, active: true }

我正在尝试构建一个 observable,它调用第一个端点并为每个返回的 id 调用第二个端点并发出所有 active = true 值的总和:

private getIds(group: string) {
  const url = '...';
  return this.http.get<{ ids: number[] }>(url, { params: { group } });
}

private getValue(id: number) {
  const url = '...';
  return this.http.get<ActiveValue>(url, { params: { id: id.toString() } });
}

public getSum(group: string) {
  let sum = 0;
  const bSubject = new BehaviorSubject(sum);

  const observables = this.getIds(group).pipe(
    mergeMap(({ ids }) => ids),
    map(id => this.getValue(id).pipe(tap(({ value, active }) => {
      if (active) {
        sum += value;
        bSubject.next(sum);
      }
    })))
  );

  const observable = forkJoin(observables).pipe(map(() => sum));
  return { bSubject, observable };
}

interface ActiveValue {
  value: number;
  active: boolean;
}

但它抱怨:

forkJoin is deprecated: Use the version that takes an array of Observables instead (deprecation)

此外,当我将鼠标悬停在 observables 上时,它会显示:

const observables: Observable<Observable<ActiveValue>>

...虽然我认为应该是Observable&lt;ActiveValue&gt;[]

我怎样才能让它工作?

【问题讨论】:

  • "虽然我认为它应该是 Observable[]" 你能解释一下你得出这个结论的思考过程吗?为什么你要按照你的方式建造你的管道? (你好像误解了mergeMap和map)

标签: angular typescript rxjs rxjs6


【解决方案1】:

我不舒尔,但你可以尝试这样的事情

interface ActiveValue {
    value: number;
    active: boolean;
}

function countActiveValues(values: ActiveVale[]) {
    return values.reduce((acc, { value, active }) => acc + active ? value : 0, 0)
}

class MyClass {
    private getIds(group: string) {
        const url = '...';
        return this.http.get < { ids: number[] } > (url, { params: { group } });
    }

    private getValue(id: number) {
        const url = '...';
        return this.http.get < ActiveValue > (url, { params: { id: id.toString() } });
    }

    private getRequests(ids: number[]) {
        return ids.map((id) => this.getValue(id));
    }

    public getSum(group: string) {
        return this.getIds(group).pipe(
            map(({ ids }) => this.getRequests(ids)),
            switchMap((requests) => forkJoin(requests)),
            map((results) => countActiveValues(result))
        );
    }
}

并且不要忘记为您的请求捕获错误;)

【讨论】:

  • 谢谢,它现在可以正常工作了……这有点违反直觉,我需要多读几遍 rxjs 文档才能让这些事情在我的脑海中出现
  • 第一次接触 RxJS 时,它是一个相当困难的概念,但随着实践,它变得更加清晰 =)
【解决方案2】:

可能是这样的:

const data = [
  { id: 1, value: 30, active: true },
  { id: 2, value: 10, active: false },
  { id: 3, value: 5, active: true },
  { id: 45, value: 1, active: false }
]

function getIds(group) {
    const promise = new Promise(function(resolve, reject) {
     setTimeout(function() {
        const resp = { ids: [1, 2, 3, 45] }
       resolve(resp.ids);
     }, 100);
   });
   return Rx.Observable.fromPromise(promise)
   
}

function getValue(id) {
        const promise = new Promise(function(resolve, reject) {
     setTimeout(function() {
       resolve(data.find(x => x.id == id));
     }, 100);
   });
   return Rx.Observable.fromPromise(promise)
}

const reduceSum = (acc, {active, value}) => acc += active ? value : 0

function getSum(group) {
  return this.getIds(group)
    .mergeMap(ids => {
        return Rx.Observable.from(ids)
        .mergeMap(id => this.getValue(id))
        .reduce(reduceSum, 0)
    })
}

getSum().subscribe(console.log)
&lt;script src="https://unpkg.com/@reactivex/rxjs@5.5.12/dist/global/Rx.js"&gt;&lt;/script&gt;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-03
    • 2011-06-17
    • 1970-01-01
    • 2020-01-25
    • 1970-01-01
    相关资源
    最近更新 更多