【问题标题】:How to properly flatten an stream of observables?如何正确展平可观察的流?
【发布时间】:2020-06-20 02:14:33
【问题描述】:

这是我想要做的:

给定一个分页 API,使用并行请求获取所有资源。

API 每次调用返回有限数量的资源。所以,你需要使用一个偏移量参数来获取下一组数据,直到所有数据都被提取出来。

这是我的想法(但收到一些警告,因为我在响应中使用 flat),所以也许有更好的方法来做到这一点。

  1. 获取项目总数。
  2. 给定计数和限制,计算获取所有数据需要多少请求。
  3. 并行触发所有请求并将所有数据合并到一个扁平数组中。

这是一个例子:

https://stackblitz.com/edit/paginated-api?embed=1&file=index.ts&hideExplorer=1&devtoolsheight=100

getCount().pipe(
  mergeMap(count => range(0, Math.ceil(count / limit))),
  map(offset => getDevices(offset, limit)),
  combineAll(),
).subscribe(res => {
  const a = res.flat(); // <--- warning: Property 'flat' does not exist on type '{ name: string; }[][]'.
  console.log(JSON.stringify(a));
});

我觉得这个解决方案有点 hacky。它使订阅中的响应变平。我想知道是否有可以在管道上使用的 RXJS 运算符来展平响应,所以我不必在订阅中使用?

【问题讨论】:

  • 请给minimal reproducible example什么警告?
  • @jonrsharpe 添加了一个示例和我收到的警告
  • @jonrsharpe 很好地消除警告。但是,我觉得在订阅中展平数组有点 hacky。我想知道是否有可以在管道上使用的 RXJS 运算符来展平响应,所以我不必在订阅中使用?
  • RxJS 不关心数组,它处理 observables。但是,如果您的工作代码您认为可以改进,请参阅Code Review

标签: javascript typescript rxjs


【解决方案1】:

对于每个内部的 Observable,我们需要另一个扁平化操作符。

所以这样的事情会起作用:

getCount().pipe(

  mergeMap(count => range(0, Math.ceil(count / limit))),

  mergeMap(offset => getDevices(offset, limit)),

  mergeAll(),
  toArray()

).subscribe(res => {
  console.log('result', JSON.stringify(res));
});

第一个 mergeMap 将内部的 range Observable 变平。第二个mergeMap 使getDevices 变平,我假设它返回一个Observable。

mergeAll() 合并所有单独的值,即对象。

toArray() 然后将所有对象添加到单个数组中。

这是结果:

result
[{"name":"dev-1"},{"name":"dev-2"},{"name":"dev-3"},{"name":"dev-4"},{"name":"dev-5"},{"name":"dev-6"},{"name":"dev-7"},{"name":"dev-8"},{"name":"dev-9"},{"name":"dev-10"},{"name":"dev-11"},{"name":"dev-12"},{"name":"dev-13"},{"name":"dev-14"},{"name":"dev-15"},{"name":"dev-16"},{"name":"dev-17"},{"name":"dev-18"},{"name":"dev-19"},{"name":"dev-20"}]

希望这会有所帮助。

【讨论】:

  • 非常感谢您的回答@DeborahK!正是我想要的。顺便说一句,我在 ng-conf 中参加了你的演讲!一定要在 Pluralsight 中学习 RXJS 课程! :)
【解决方案2】:

三种方法:

使用mergeMap,所有请求都是并行触发的。但是,最终结果将基于到达的顺序。这意味着如果您的 API 已排序,这可能会破坏它。

const getAllOffsets = () => pipe(
  mergeMap((count: number) => range(0, Math.ceil(count / limit))),
  toArray(),
  // all requests in parallel and results in order (order of creation)
  concatMap(r => forkJoin(...r.map(offset => getDevices(offset, limit)))),
  map(a => a.flat())
);

使用concatMap,这样可以保证订单。但是,每个请求都会一个接一个地执行。这将是一个性能问题

const getAllOffsets2 = () => pipe(
  mergeMap((count: number) => range(0, Math.ceil(count / limit))),
  // all requests in parallel but order is not guarantee (response order)
  concatMap(offset => getDevices(offset, limit)),
  mergeAll(),
  toArray()
);

最后,使用forkJoin。这将并行执行所有请求并保持它们的创建顺序。喜欢Promise.all。这是从分页 API 获取所有资源的最佳选择。

const getAllOffsets3 = () => pipe(
  mergeMap((count: number) => range(0, Math.ceil(count / limit))),
  toArray(),
  // all requests in parallel and results in order (order of creation)
  concatMap(r => forkJoin(...r.map(offset => getDevices(offset, limit)))),
  map(a => a.flat())
);

完整的工作示例: https://stackblitz.com/edit/paginated-api-ozcgwg?file=index.ts&hideExplorer=1&devtoolsheight=100

getCount().pipe(getAllOffsets3()).subscribe(res => {
  console.log({res, size: res.length || 0});
  console.log(JSON.stringify(res))
  // console.log('end', res.map(d => d.name));
});

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-04-16
    • 1970-01-01
    • 2019-04-06
    • 1970-01-01
    • 2014-11-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多