【问题标题】:How can I implement a nodeJS worker that streams data from mongo to elasticsearch?如何实现将数据从 mongodb 流式传输到 elasticsearch 的节点 JS 工作程序?
【发布时间】:2020-11-27 16:41:00
【问题描述】:

我正在构建一个基于CDC 的应用程序,它使用Mongo Change Streams 来侦听更改事件并近乎实时地索引elasticsearch 中的更改。

到目前为止,我已经实现了一个 worker,它调用一个函数来捕获事件、转换它们并在 elasticsearch 中对它们进行索引,在为 1 个 mongo 集合实现流时没有问题:

function syncChangeEvents() {
  const stream = ModelA.watch()
  while (!stream.isClosed()) {
    if (await stream.hasNext()) {
      const event = stream.next()
      // transform event
      // index to elasticsearch
    }
  }
}

我已经使用无限循环(可能是一种不好的方法)实现了它,但我不确定当我必须保持更改流永远存在时还有什么替代方案。

当我必须为另一个模型实现更改流时,问题就出现了。由于第一个函数有一个阻塞的 while 循环,worker 无法调用第二个函数来启动第二个更改流。

我想知道最好的方法是启动一个可以触发 x no 的工人。在不影响每个更改流的性能的情况下更改流。工作线程是正确的方法吗?

【问题讨论】:

  • 你为什么不订阅呢? stream.on('change', next => { // process next document });
  • 感谢@GwenM,该方法也有效。您如何建议让工作人员保持活动状态,以便我可以创建多个流来监听不同的集合?

标签: node.js mongodb elasticsearch worker changestream


【解决方案1】:

在 Node.js 中使用变更流有三种主要方式。

  1. 您可以使用 EventEmitter 的 on() 函数监控变更流。

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter).
     // We can use EventEmitter's on() to add a listener function that will be called whenever a change occurs in the change stream.
     // See https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_emitter_on_eventname_listener for the on() docs.
     changeStream.on('change', (next) => {
         console.log(next);
     });
    
     // Wait the given amount of time and then close the change stream
     await closeChangeStream(timeInMs, changeStream);
    
  2. 您可以使用hasNext() 监控变更流。

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // Set a timer that will close the change stream after the given amount of time
     // Function execution will continue because we are not using "await" here
     closeChangeStream(timeInMs, changeStream);
    
     // We can use ChangeStream's hasNext() function to wait for a new change in the change stream.
     // If the change stream is closed, hasNext() will return false so the while loop will exit.
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html for the ChangeStream docs.
     while (await changeStream.hasNext()) {
         console.log(await changeStream.next());
     }
    
  3. 您可以使用 Stream API 监控变更流

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html#pipe for the pipe() docs
     changeStream.pipe(
         new stream.Writable({
             objectMode: true,
             write: function (doc, _, cb) {
                 console.log(doc);
                 cb();
             }
         })
     );
    
     // Wait the given amount of time and then close the change stream
     await closeChangeStream(timeInMs, changeStream);
    

如果您的 MongoDB 数据库托管在 Atlas (https://cloud.mongodb.com) 上,最简单的做法是创建一个 Trigger。 Atlas 为您处理变更流代码的编程,因此您只需编写将转换事件并在 Elasticsearch 中为它们编制索引的代码。

有关使用变更流和触发器的更多信息,请访问in my blog postGitHub 上提供了上述所有 sn-ps 的完整代码示例。

【讨论】:

  • 谢谢@Lauren。我已经多次参考您的博文以了解如何实施变更流。对于非触发器解决方案,您如何建议让 Node.js 进程无限期地运行,以便它可以捕获流发出的所有事件?
  • 以上三种解决方案都在一定时间后显式关闭更改流。您可以省略这些调用以使更改流保持打开状态。在某些时候,您的应用程序可能会失去与更改流的连接。请务必存储恢复令牌,以便您可以在上次离开的位置重新打开更改流。有关详细信息,请参阅博文的这一部分:developer.mongodb.com/quickstart/…
  • 谢谢。我已经使用startAfter 选项实现了恢复更改流的逻辑。关于这一点,当startAfter 似乎是一个更好的选择时,mongodb 是否有任何理由继续支持resumeAfter 选项?
  • startAfter 是在 2019 年 8 月正式发布的 MongoDB 4.2 中添加的。resumeAfter 可能仍然在那里支持从以前版本升级的那些。
  • 好的,知道了。谢谢劳伦!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-05-26
  • 2016-07-06
  • 2017-05-05
  • 2020-04-10
  • 1970-01-01
  • 2022-06-16
  • 1970-01-01
相关资源
最近更新 更多