【问题标题】:Why is my RxJS Observable completing right away?为什么我的 RxJS Observable 立即完成?
【发布时间】:2016-09-27 19:26:58
【问题描述】:

我对 RxJS 有点陌生,它让我很头疼,所以我希望有人能提供帮助!

我在我的 express 服务器上使用 RxJS(5) 来处理我必须保存一堆 Document 对象然后将每个对象通过电子邮件发送给他们的收件人的行为。我的documents/create 端点中的代码如下所示:

    // Each element in this stream is an array of `Document` model objects: [<Document>, <Document>, <Document>] 
    const saveDocs$ = Observable.fromPromise(Document.handleCreateBatch(docs, companyId, userId));

    const saveThenEmailDocs$ = saveDocs$
      .switchMap((docs) => sendInitialEmails$$(docs, user))
      .do(x => {
        // Here x is the `Document` model object
        debugger;
      });

    // First saves all the docs, and then begins to email them all.
    // The reason we want to save them all first is because, if an email fails,
    // we can still ensure that the document is saved
    saveThenEmailDocs$
      .subscribe(
        (doc) => {
          // This never hits
        },
        (err) => {},
        () => {
          // This hits immediately.. Why though?
        }
      );

sendInitialEmails$$ 函数返回一个 Observable,如下所示:

  sendInitialEmails$$ (docs, fromUser) {
    return Rx.Observable.create((observer) => {

      // Emails each document to their recepients
      docs.forEach((doc) => {
        mailer.send({...}, (err) => {
          if (err) {
            observer.error(err);
          } else {
            observer.next(doc);
          }
        });
      });

      // When all the docs have finished sending, complete the
      // stream
      observer.complete();
    });
  });

问题是当我订阅saveThenEmailDocs$ 时,我的next 处理程序永远不会被调用,它会直接转到complete。我不知道为什么...相反,如果我从sendInitialEmails$$ 中删除observer.complete() 调用,则每次都会调用next 处理程序,并且永远不会调用订阅中的complete 处理程序。

为什么 next next complete 的预期行为没有发生,而是其中之一......我错过了什么吗?

【问题讨论】:

  • 嗨,约翰尼,你有没有解决过这个问题?我知道我回答晚了,但最终是你的解决方案吗?

标签: node.js express reactive-programming observable rxjs5


【解决方案1】:

我只能假设mailer.send 是一个异步调用。 您的observer.complete() 在所有异步调用都已启动但在其中任何一个调用完成之前被调用。

在这种情况下,我会从 docs 数组中创建一个可观察值流,而不是像这样包装它。

或者,如果您想手动将其包装到 observable 中,我建议您查看库 async 并使用

async.each(docs, function(doc, callback) {...}, function finalized(err){...})

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-07-30
    • 2019-02-05
    • 1970-01-01
    • 2017-06-14
    • 2020-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多