【发布时间】:2020-11-27 16:41:00
【问题描述】:
我正在构建一个基于CDC 的应用程序,它使用Mongo Change Streams 来侦听更改事件并近乎实时地索引elasticsearch 中的更改。
到目前为止,我已经实现了一个 worker,它调用一个函数来捕获事件、转换它们并在 elasticsearch 中对它们进行索引,在为 1 个 mongo 集合实现流时没有问题:
function syncChangeEvents() {
const stream = ModelA.watch()
while (!stream.isClosed()) {
if (await stream.hasNext()) {
const event = stream.next()
// transform event
// index to elasticsearch
}
}
}
我已经使用无限循环(可能是一种不好的方法)实现了它,但我不确定当我必须保持更改流永远存在时还有什么替代方案。
当我必须为另一个模型实现更改流时,问题就出现了。由于第一个函数有一个阻塞的 while 循环,worker 无法调用第二个函数来启动第二个更改流。
我想知道最好的方法是启动一个可以触发 x no 的工人。在不影响每个更改流的性能的情况下更改流。工作线程是正确的方法吗?
【问题讨论】:
-
你为什么不订阅呢?
stream.on('change', next => { // process next document }); -
感谢@GwenM,该方法也有效。您如何建议让工作人员保持活动状态,以便我可以创建多个流来监听不同的集合?
标签: node.js mongodb elasticsearch worker changestream