【发布时间】:2022-01-27 23:04:51
【问题描述】:
我有一些代码从数据库中读取,迭代每一行数据并对其执行一些逻辑,然后创建一个可观察对象,然后写入数据库,将其添加到一个数组中(创建一个可观察对象数组),这样当通过 forkJoin 订阅 observables 数组时,所有必要的数据都会写入数据库。
这似乎工作得很好,直到数组中的可观察对象的数量变得非常大。行数可以在 0-6000 之间,因此数组的大小可以增长到这个值。当它达到这个大小时,observable 不再写入数据库,而是从defaultIfEmpty 返回默认值。我很困惑为什么它在少量可观察的情况下正常工作,但在大量的情况下突然变空......
通过代码示例可能会更清楚一点
function writeToDB() {
// rows taken from the database, n = 0..6000
data = []
// array of observables
observables = []
for (const row of data) {
if (row.age > 20) {
// websocket between service and database, returns an observable
const observable = websocket.put(row).pipe(
o$.catchError((err) => {
return r$.of(err)
}),
o$.defaultIfEmpty({
success: true,
status: 200
})
);
observables.push(observable);
}
}
return forkJoin([...observables]);
}
在订阅时使用此示例非常有效,但数组 observables 的长度约为 5000 的大型数据集除外。那时它开始返回defaultIfEmpty 值{ success: true, status: 200 },我无法解释为什么......任何帮助或建议将不胜感激。
【问题讨论】:
-
我认为 forkjoin 会立即触发所有可能达到自己最大处理能力的套接字
标签: rxjs rxjs-observables