【问题标题】:Watch document creation with MongoDB Streams使用 MongoDB Streams 观看文档创建
【发布时间】:2019-05-06 04:35:20
【问题描述】:

我想在每次将特定类型的数据插入集合时使用 MongoDB 流来触发事件。

我已经找到了与我正在寻找的内容大致相似的东西,但这仅适用于更改流而不是插入。

知道如何完成这项工作吗?

我正在使用带有 NodejsMongodb 驱动程序来完成这项工作,因此我的代码将是这样的:

const MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
MongoClient.connect(uri, function(err, client) {

const db = client.db('mydb');

// Connect using MongoClient
var filter = [{
    $match: {
        $or: [
            { $or: [{"receipt.receiver": "newdexpocket"}, {"act.account": "newdexpocket"}] }]
    }
}];

var options = { fullDocument: 'updateLookup' };
db.collection('somecollection').watch(filter, options).on('create', data => 
  {
    console.log(data);
  });
});
  • 是否需要在过滤器中指定operationType
  • 我还需要获取完整文档,但显然 updateLookup 不是正确的工具,我应该使用什么?
  • on 事件可以使用哪些选项?我用过create,但我什至不确定它是否存在,是吗?

很抱歉所有这些问题,但我很难在官方文档中找到一些答案。

解决方案:

请注意不要在您的请求中忘记fullDocument ;-)

function watch_insert(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll);

  const insert_pipeline = [ { $match:
                    { 
                        operationType: 'insert',
                        $or: [
                            { "fullDocument.receipt.receiver": "newdexpocket" },
                            { "fullDocument.act.account": "newdexpocket" }
                        ]
                    }
                }];

  con.db(db).collection(coll).watch(insert_pipeline)
    .on('change', data => {
      console.log(data)
    });
}

async function run(uri) {
    try {
        con = await MongoClient.connect(uri, {"useNewUrlParser": true});
        watch_insert(con, 'EOS', 'action_traces');
    } catch (err) {
        console.log(err);
    }
}

【问题讨论】:

    标签: node.js mongodb mongodb-query


    【解决方案1】:

    您需要:

    1. 指定operationType: 'insert'。由于您不想监控更新,因此不需要updateLookup
    2. 为包含operationType 的过滤器创建适当的aggregation pipeline
    3. 聚合管道过滤由watch() 返回的文档。 Change Events page 中有一个示例输出。

    watch() 返回一个ChangeStream。它触发closechangeenderror 事件。详情请见ChangeStream

    这是一个完整的变更流示例,它侦听数据库test 上的insert 操作@ 集合test。它将输出具有字段{a: 1} ('fullDocument.a': 1) 的文档,并忽略更新、插入其他a 值或任何没有字段a 的内容。

    const MongoClient = require('mongodb').MongoClient
    const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
    
    const insert_pipeline = [
      {$match: {operationType: 'insert', 'fullDocument.a': 1}}
    ]
    
    function watch_insert(con, db, coll) {
      console.log(new Date() + ' watching: ' + coll)
      con.db(db).collection(coll).watch(insert_pipeline)
        .on('change', data => {
          console.log(data)
        })
    }
    
    async function run() {
      con = await MongoClient.connect(uri, {"useNewUrlParser": true})
      watch_insert(con, 'test', 'test')
    }
    
    run()
    

    【讨论】:

    • 非常感谢您的详细回答......这正在我的测试数据库上工作,但我的生产实例有问题。在生产环境中,我收到以下消息:The $changeStream stage is only supported on replica sets 因为我没有使用replicaSet... 我该如何解决这个问题?我的数据库已经接近 1TB,所以我无法在其上添加冗余,因为这对我来说成本太高(特别是考虑到如果数据库崩溃并不是什么大问题,因为在我的情况下这是一个临时数据库)。提前致谢,
    • 好的,我找到了答案,只需重新启动 mongo 添加 ?replset=rs0 (或任何东西)选项,然后 `rs.initiate()' 我很好;)再次感谢您的帮助,
    • 它还没有回答,查询仍然不起作用,我已经更新了我的原始帖子以添加一些额外的代码......你知道它为什么不起作用吗?
    • 当然 ;-) 该函数被调用并且观察者正在运行,但它根本没有在插入项目时赶上事件......我可以看到监听器在日志中正确运行但是请求本身有问题吗?可以肯定的是,on('change') 事件永远不会被触发。有什么想法吗?
    • 您认为请求本身有问题吗?
    猜你喜欢
    • 2017-05-18
    • 1970-01-01
    • 2015-07-05
    • 2019-06-09
    • 2016-11-11
    • 2018-04-04
    • 1970-01-01
    • 2017-08-03
    • 2020-05-16
    相关资源
    最近更新 更多