【问题标题】:rxjs - observable emits nothing (not completing?)rxjs - observable 什么都不发出(没有完成?)
【发布时间】:2019-02-05 17:32:08
【问题描述】:

我有一个 Angular 6 应用程序,我最初用一个 rest api 设置我的后端,但我开始转换部件以使用 socket.io。

当我从我的 rest api 返回数据时,以下工作:

this.http.get(api_url + '/versions/entity/' + entityId).pipe(
  mergeMap((versions:IVersion[]) => versions),
  groupBy((version:IVersion) => version.type),
  mergeMap(group => group.pipe(
    toArray(),
    map(versions=> {
      return {
        type: group.key,
        versions: versions
      }
    }),
    toArray()
  )),
  reduce((acc, v) => acc.concat(v), [])
);

特快路线:

router.get('/entity/:entityId', (req, res) => {
  const entityId = req.params.entityId;

  Version.getVersionsByEntity(entityId, (err, versions) => {
    if (err) {
      res.json({success: false, msg: err});
    } else {
      res.json(versions);
    }
  });
});

使用 mongoose 从我的 mongo 数据库中获取数据:

export function getVersionsByEntity(entityId, callback) {
  console.log('models/version - get versions by entity');

  Version.find({'entity.entityId': entityId})
          .exec(callback);
}

但是,当我使用 socket.io 进行完全相同的调用时,observable 不会返回任何内容。我猜是因为它永远不会完成?数据传输成功后,http 调用是否会发送“完成”消息?

从这个服务发送套接字:

getVersionsByEntity(entityId): Observable<IVersion[]> {
    // create observable to list to refreshJobs message
    let observable = new Observable(observer => {
      this._socketService.socket.on('versionsByEntity', (data) => {
        observer.next(data);
      });
    });

    this._socketService.event('versionsByEntity', entityId);

    return <Observable<IVersion[]>> observable;
  }

从服务器调用相同的猫鼬函数。

从 (socket) 服务返回的 observable 确实包含数据,只有在我添加 toArray() 函数后,它才会在我订阅时打印任何内容..

有人可以帮我解决这个问题并解释这背后的理论吗?不是完成了吗? http 是否使用 Angular 发送“已完成”消息?

编辑:我创建了一个简单的 stackblitz,它正在做我想做的事,但我想从 _dataService 中删除 take(1),因为我可能想要更新数据,所以我想保持可观察的打开 - @987654321 @

编辑 2:我用扫描运算符替换 toArray 很接近,但它似乎为数组发射了两次。 reduce() 发出正确的数据,但似乎只在完成时发出(如 toArray),所以它没有更好 - https://stackblitz.com/edit/rxjs-toarray-problem-tpeguu

【问题讨论】:

    标签: angular group-by rxjs toarray


    【解决方案1】:

    处理事件流有点不同。我试图在下面提出一些代码:

    getVersionsByEntity(entityId): Observable<IVersion[]> {
        return defer(() => {
            this._socketService.event('versionsByEntity', entityId);
    
            return fromEvent(this._socketService.socket, 'versionsByEntity')
                .pipe(take(1))
        }) as Observable<IVersion[]>;
    }
    

    主要思想是将所有内容包装在defer 中,这将使Observable 变得懒惰,并且仅在订阅时调用socketService.event(您的原始实现急切地立即调用socketService.event)。急切地实现可能会产生意想不到的后果——如果 Observable 订阅得太晚,很容易错过事件。

    我还建议使用fromEvent Observable factory - 它处理事件侦听器的设置和拆除。

    最后在第一次发射后完成 Observable 我添加了take(1) - 这会将发射次数限制为 1 并取消订阅 Observable。

    【讨论】:

    • 感谢您的建议,一切都说得通,我会尽快落实您的建议
    【解决方案2】:

    除了@m1ch4ls 所说的之外,您还必须考虑toArray 的工作原理。 toArray 将 Observable 通知的所有数据转换为此类数据的数组。为了使其工作,Observable 必须完成。

    Angular http 客户端返回的 Observable 总是在第一次通知之后完成,因此 toArray 有效。 socket.io 流在关闭时完成,因此在这样的流上使用 toArray 可能会产生永远不会从 Observable 中获取任何值的效果,以防流未关闭。

    另一方面,如果您想在一个通知后关闭流,如果您使用take(1) 会发生这种情况,那么您最好考虑使用 http 请求。套接字流被认为是一种长期存在的通道,因此如果您必须始终在传输一条消息后关闭它们,则它不符合它们的性质。

    评论后的更新版本

    这是没有take的代码

     getData() {
        return this._dataService.getVersions().pipe(
          map(
            (versions: Array<any>) => versions.reduce(
              (acc, val) => {
                const versionGroup = (acc.find(el => el.type === val.type));
                if (versionGroup) {
                  versionGroup.versions.push(val)
                } else {
                  acc.push({type: val.type, versions: [val]})
                }
                return acc;
              }, []
            )
          ),
        )
    

    要理解的关键点是,您的服务正在向您返回 Array 的东西。为了达到您的结果,您可以使用Array 方法直接处理Array,而无需使用Observable 运算符。请不要将我上面使用的分组逻辑的代码视为正确的实现 - 真正的实现可能会受益于使用 lodash 之类的东西进行分组,但我不想让事情过于复杂。

    这是你的原始代码

    getData() {
        return this._dataService.getVersions().pipe(
          mergeMap((versions: Array<any>) => versions),
          groupBy((version:IVersion) => version.type),
          mergeMap(group => group.pipe(
            toArray(),
            map(versions=> {
              return {
                type: group.key,
                versions: versions
              }
            }),
            toArray()
          )),
          reduce((acc, v) => acc.concat(v), [])
        )
      }
    

    这是做什么的

    1. 您创建了一个对象流,将第一个 mergeMap 应用于 服务返回的数组
    2. 通过groupBy 运算符创建一个新的 Observable,它发出

    3. 根据您的逻辑分组的对象,然后您再输入一秒钟 复杂的mergeMap 接受groupBy 发出的数组, 把它们变成一个对象流,只是为了立即转换它们 通过第一个 toArray 再次到一个数组,然后 转换为 {type: string, versions: []} 类型的 Object,到 然后最终再次调用toArray 来创建一个Array of Array

    4. 您要做的最后一件事是运行reduce 来创建您的最终数组

    为什么它只适用于take?因为groupBy 仅在其源 Observable 完成时才会执行,因此您想要对一组有限的事物进行分组是有道理的。 reduce Observable 运算符也是如此。

    take 是完成源 Observable 的一种方式。

    【讨论】:

    • 感谢您提供此信息,这正是我所关心的。因此,当我通过我的套接字返回数据时,应该触发我可观察到的 next() 函数?并且 toArray 不会在 next() 上发出值吗?理想情况下,我不会使用 take(1),因为我可能想实时更新数据(这就是我使用套接字的原因)。我可以使用另一种方法吗?也许没有 toArray?
    • 这可能是@m1ch4ls 建议的那样,因为我有一个热切的可观察对象,它在我实际订阅它之前就发出了?我尝试使用行为主题而不是可观察对象,因为我认为这意味着始终保持最新值,但这也不起作用..
    • toArraysubscribe 删除到“关闭”你的套接字的 Observable 中。 subscribe 允许您定义 3 个函数:第一个是在任何时候调用 next 时调用的函数,第二个是管理错误,第三个是在 Obervable 完成时执行某些操作。
    • 我不确定在这种情况下是急切还是懒惰。我倾向于认为关闭套接字的 Observable 本质上是热的(渴望?)。如果您通过没有附加侦听器的套接字发送消息会发生什么?它只是迷路了。如果您稍后订阅套接字,您将只收到从那一刻起通过套接字发送的消息。相反,包装 http 请求的 Observables 是惰性的,因为它们在订阅 Observable 之前不会执行。
    • 我在 stackblitz 中创建了一个简单的示例。我如何编辑它以获得相同的结果但不使用 take(1) 以便我可以在需要时更新数据? - stackblitz.com/edit/rxjs-toarray-problem
    猜你喜欢
    • 2016-09-27
    • 2017-06-14
    • 2020-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-25
    • 2018-10-15
    • 1970-01-01
    相关资源
    最近更新 更多