【问题标题】:RxJs Array of Observable to Observable of ArrayRxJs Array of Observable to Observable of Array
【发布时间】:2021-07-08 17:54:31
【问题描述】:

对于在 TypeScript 中使用 Angular2 编写的 Web 应用程序,我需要使用 RxJs Observables。
由于我以前从未使用过 rxjs,而且我对响应式编程很陌生,所以有时我很难找到正确的方法来做一些特定的事情。
我现在面临一个问题。我必须将Array<Observable<T>> 转换为Observable<Array<T>>
我将尝试用一个例子来解释它:

  • 我有一个Observable,它为我提供了Users (Observable<Array<User>>) 的列表
  • User 类有一个函数 getPosts 返回一个 Observable<Array<Post>>
  • 我需要将Observable<Array<User>> 映射到Observable<Array<Post>> 以评估onNext 函数内的所有Posts。

我可以很容易地从Observable<Array<User>>映射到Observable<Array<Observable<Array<Post>>>>使用
map((result : Array<User>) => result.map((user : User) => user.getPosts()))
我可以将Array<Array<Post>> 展平为Array<Post>
但是我只是找不到将Observable<Array<Observable<Array<Post>>>> 映射到Observable<Array<Array<Post>>> 的正确方法
到目前为止,我将combineLatest 函数与flatMap 一起使用。
对我来说,它似乎有效,我使用的编辑器(Atom 编辑器)没有显示任何错误。但是现在我使用 Netbeans,它显示了这段代码中的错误。使用“tsc”编译代码也会导致以下错误:

Argument of type '(result: Post[]) => void' is not assignable to parameter of type 'NextObserver<[Observable<Post>]> | ErrorObserver<[Observable<Post>]> | CompletionObserver<[...'.
Type '(result: Post[]) => void' is not assignable to type '(value: [Observable<Post>]) => void'.  

所以我的问题是:
我怎样才能将ArrayObservables“扁平化”为Array

【问题讨论】:

  • getPosts方法对应什么?它会加载特定用户的帖子吗?

标签: typescript angular rxjs observable


【解决方案1】:

flatMap 运算符允许这样做。我不完全理解您要做什么,但我会尽力提供答案...

如果你想加载所有的

getPostsPerUser() {
  return this.http.get('/users')
    .map(res => res.json())
    .flatMap((result : Array<User>) => {
      return Observable.forkJoin(
        result.map((user : User) => user.getPosts());
    });
}

Observable.forkJoin 允许您等待所有可观察对象都收到数据。

上面的代码假设user.getPosts()返回一个observable...

这样,您将收到一组帖子:

this.getPostsPerUser().subscribe(result => {
  var postsUser1 = result[0];
  var postsUser2 = result[1];
  (...)
});

【讨论】:

  • 是的,getPosts() 返回一个Observable&lt;Array&lt;Post&gt;&gt;。所以我想这就是我要找的。我会尽快尝试并让你知道它是否有效。谢谢
  • 完美运行。我只需要用forkJoin 替换combineLatest。非常感谢!
【解决方案2】:

你可以在你想要的rxjs方法上使用函数apply,像这样:

const source1 = Rx.Observable.interval(100)
  .map(function (i) { return 'First: ' + i; });

const source2 = Rx.Observable.interval(150)
  .map(function (i) { return 'Second: ' + i; });

const observablesArray = [source1, source2];

const sources = Rx.Observable.combineLatest
.apply(this, observablesArray).take(4)

/* or you can write it without "apply" this way:
const sources = Rx.Observable
                .combineLatest(...observablesArray)
                .take(4)
*/
sources.subscribe(
  (response) => {
    console.log(response);
  }
)
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"&gt;&lt;/script&gt;

【讨论】:

  • 我想这应该可以,但我现在会坚持使用forkJoin
【解决方案3】:

这里是一些使用 rxjs@6.3.3 的示例代码

import { of, forkJoin} from 'rxjs';
import { Observable } from 'rxjs'

const source:Array<Observable<String>> = [of('A'), of('B'), of('C') ];

forkJoin(source).subscribe( (x)=> console.log(x))

https://arrayofobservablesexamp.stackblitz.io

【讨论】:

    【解决方案4】:

    在这个 observables 数组上使用 forkJoin,那么它将是一个 observable 数组。

    【讨论】:

      【解决方案5】:

      使用toArray()

      range(0, 7).pipe(
        concatMap(v => of(`day_${v}`)),
        toArray() // here
      ).subscribe(a => {
        console.log(a); // ['day_0', 'day_1', ... 'day_6']
      });
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2018-11-09
        • 2017-12-13
        • 2020-04-24
        • 2022-12-01
        • 2022-12-01
        • 2021-04-15
        • 1970-01-01
        • 2022-12-01
        相关资源
        最近更新 更多