【问题标题】:Observable defaulting when not empty不为空时可观察到的默认值
【发布时间】:2022-01-27 23:04:51
【问题描述】:

我有一些代码从数据库中读取,迭代每一行数据并对其执行一些逻辑,然后创建一个可观察对象,然后写入数据库,将其添加到一个数组中(创建一个可观察对象数组),这样当通过 forkJoin 订阅 observables 数组时,所有必要的数据都会写入数据库。

这似乎工作得很好,直到数组中的可观察对象的数量变得非常大。行数可以在 0-6000 之间,因此数组的大小可以增长到这个值。当它达到这个大小时,observable 不再写入数据库,而是从defaultIfEmpty 返回默认值。我很困惑为什么它在少量可观察的情况下正常工作,但在大量的情况下突然变空......

通过代码示例可能会更清楚一点

function writeToDB() {
   // rows taken from the database, n = 0..6000
   data = []

   // array of observables
   observables = []

   for (const row of data) {
      if (row.age > 20) {
         // websocket between service and database, returns an observable
         const observable = websocket.put(row).pipe(
            o$.catchError((err) => { 
               return r$.of(err) 
            }),
            o$.defaultIfEmpty({
               success: true,
               status: 200
            })
         );

         observables.push(observable);
      }
   }

   return forkJoin([...observables]);
}

在订阅时使用此示例非常有效,但数组 observables 的长度约为 5000 的大型数据集除外。那时它开始返回defaultIfEmpty{ success: true, status: 200 },我无法解释为什么......任何帮助或建议将不胜感激。

【问题讨论】:

  • 我认为 forkjoin 会立即触发所有可能达到自己最大处理能力的套接字

标签: rxjs rxjs-observables


【解决方案1】:

您在此处显示的内容尚不清楚。尽管如此,如果这适用于较少数量的调用,那么websocket 很有可能在这些数量上表现出一些奇怪的行为。

值得尝试的事情可能是限制 websocket 调用的并发性。

function writeToDB(data) {
  // data contains rows taken from the database, n = 0..6000

  return from(data).pipe(

    filter(row => row.age > 20),

    map(row => websocket.put(row).pipe(
      
      catchError(err => of(err)),

      // last makes sure that mergeAll behaves like forkJoin
      last(undefined, {
         success: true,
         status: 200
      })

    )),

    // mergeAll lets you choose how many can run concurrently
    // for example, at most 50 websocket calls are made at
    // once here
    mergeAll(50),
    toArray()
  );
  
}

在这种情况下,我更喜欢map, mergeAll 而不是mergeMap(因为我认为你不太可能错过这个并发方面),但你可以使用任何一个。


function writeToDB(data) {
  // data contains rows taken from the database, n = 0..6000

  return from(data).pipe(

    filter(row => row.age > 20),

    mergeMap(row => websocket.put(row).pipe(
      
      catchError(err => of(err)),

      // last makes sure that mergeMap behaves like forkJoin
      last(undefined, {
         success: true,
         status: 200
      })

    ), 50), // <- sneaky! ;)

    toArray()
  );
  
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-02-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多