【问题标题】:How to use MongoDB transaction using Mongoose?如何使用 Mongoose 使用 MongoDB 事务?
【发布时间】:2019-04-25 09:54:09
【问题描述】:

我正在使用 MongoDB Atlas cloud(https://cloud.mongodb.com/) 和 Mongoose 库。

我尝试使用事务概念创建多个文档,但它不起作用。 我没有收到任何错误。但是,回滚似乎无法正常工作。

app.js

//*** more code here

var app = express();

require('./models/db');

//*** more code here

models/db.js

var mongoose = require( 'mongoose' );

// Build the connection string
var dbURI = 'mongodb+srv://mydb:pass@cluster0-****.mongodb.net/mydb?retryWrites=true';

// Create the database connection
mongoose.connect(dbURI, {
  useCreateIndex: true,
  useNewUrlParser: true,
});

// Get Mongoose to use the global promise library
mongoose.Promise = global.Promise;

models/user.js

const mongoose = require("mongoose");

const UserSchema = new mongoose.Schema({
  userName: {
    type: String,
    required: true
  },
  pass: {
    type: String,
    select: false
  }
});

module.exports = mongoose.model("User", UserSchema, "user");

myroute.js

const db = require("mongoose");
const User = require("./models/user");

router.post("/addusers", async (req, res, next) => {

    const SESSION = await db.startSession();

    await SESSION.startTransaction();

    try {

          const newUser = new User({
            //*** data for user ***
          });
          await newUser.save();

          //*** for test purpose, trigger some error ***
          throw new Error("some error");

          await SESSION.commitTransaction();

          //*** return data 

    } catch (error) {
            await SESSION.abortTransaction();
    } finally {
            SESSION.endSession();
    }    

 });

上面的代码没有错误,但它仍然在数据库中创建用户。它假设回滚创建的用户并且集合应该是空的。

我不知道我在这里错过了什么。谁能告诉我这里出了什么问题?

应用、模型、架构和路由器位于不同的文件中。

【问题讨论】:

    标签: javascript node.js mongodb mongoose mongodb-query


    【解决方案1】:

    您需要在事务期间处于活动状态的所有读/写操作的选项中包含session。只有这样它们才能真正应用于您可以回滚它们的事务范围。

    作为一个更完整的列表,并且仅使用更经典的Order/OrderItems 建模,对于大多数具有一些关系事务经验的人来说应该非常熟悉:

    const { Schema } = mongoose = require('mongoose');
    
    // URI including the name of the replicaSet connecting to
    const uri = 'mongodb://localhost:27017/trandemo?replicaSet=fresh';
    const opts = { useNewUrlParser: true };
    
    // sensible defaults
    mongoose.Promise = global.Promise;
    mongoose.set('debug', true);
    mongoose.set('useFindAndModify', false);
    mongoose.set('useCreateIndex', true);
    
    // schema defs
    
    const orderSchema = new Schema({
      name: String
    });
    
    const orderItemsSchema = new Schema({
      order: { type: Schema.Types.ObjectId, ref: 'Order' },
      itemName: String,
      price: Number
    });
    
    const Order = mongoose.model('Order', orderSchema);
    const OrderItems = mongoose.model('OrderItems', orderItemsSchema);
    
    // log helper
    
    const log = data => console.log(JSON.stringify(data, undefined, 2));
    
    // main
    
    (async function() {
    
      try {
    
        const conn = await mongoose.connect(uri, opts);
    
        // clean models
        await Promise.all(
          Object.entries(conn.models).map(([k,m]) => m.deleteMany())
        )
    
        let session = await conn.startSession();
        session.startTransaction();
    
        // Collections must exist in transactions
        await Promise.all(
          Object.entries(conn.models).map(([k,m]) => m.createCollection())
        );
    
        let [order, other] = await Order.insertMany([
          { name: 'Bill' },
          { name: 'Ted' }
        ], { session });
    
        let fred = new Order({ name: 'Fred' });
        await fred.save({ session });
    
        let items = await OrderItems.insertMany(
          [
            { order: order._id, itemName: 'Cheese', price: 1 },
            { order: order._id, itemName: 'Bread', price: 2 },
            { order: order._id, itemName: 'Milk', price: 3 }
          ],
          { session }
        );
    
        // update an item
        let result1 = await OrderItems.updateOne(
          { order: order._id, itemName: 'Milk' },
          { $inc: { price: 1 } },
          { session }
        );
        log(result1);
    
        // commit
        await session.commitTransaction();
    
        // start another
        session.startTransaction();
    
        // Update and abort
        let result2 = await OrderItems.findOneAndUpdate(
          { order: order._id, itemName: 'Milk' },
          { $inc: { price: 1 } },
          { 'new': true, session }
        );
        log(result2);
    
        await session.abortTransaction();
    
        /*
         * $lookup join - expect Milk to be price: 4
         *
         */
    
        let joined = await Order.aggregate([
          { '$match': { _id: order._id } },
          { '$lookup': {
            'from': OrderItems.collection.name,
            'foreignField': 'order',
            'localField': '_id',
            'as': 'orderitems'
          }}
        ]);
        log(joined);
    
    
      } catch(e) {
        console.error(e)
      } finally {
        mongoose.disconnect()
      }
    
    })()
    

    所以我通常建议以小写形式调用变量session,因为这是所有操作都需要的“选项”对象的键名。将它保持在小写约定中也允许使用诸如 ES6 对象分配之类的东西:

    const conn = await mongoose.connect(uri, opts);
    
    ...
    
    let session = await conn.startSession();
    session.startTransaction();
    

    另外,猫鼬documentation on transactions 有点误导,或者至少它可能更具描述性。在示例中它所指的db 实际上是Mongoose Connection 实例,而不是底层Db 甚至mongoose 全局导入,因为有些人可能会误解这一点。请注意,在清单和上面的摘录中,这是从 mongoose.connect() 获得的,应该保留在您的代码中,作为您可以从共享导入中访问的内容。

    或者,您甚至可以随时通过mongoose.connection 属性在模块化代码中获取它, 建立连接之后。这在诸如服务器路由处理程序之类的东西中通常是安全的,因为在调用代码时将有一个数据库连接。

    代码还演示了session在不同模型方法中的用法:

    let [order, other] = await Order.insertMany([
      { name: 'Bill' },
      { name: 'Ted' }
    ], { session });
    
    let fred = new Order({ name: 'Fred' });
    await fred.save({ session });
    

    所有基于find() 的方法以及基于update()insert()delete() 的方法都有一个最终的“选项块”,其中需要此会话键和值。 save() 方法的唯一参数是这个选项块。这就是告诉 MongoDB 将这些操作应用于该引用会话上的当前事务的原因。

    以几乎相同的方式,在提交事务之前,任何对 find() 或类似的请求,如果未指定 session 选项,则在该事务进行时看不到数据的状态。修改后的数据状态仅在事务完成后可用于其他操作。请注意,这会影响 documentation 中的写入。

    当发出“中止”时:

    // Update and abort
    let result2 = await OrderItems.findOneAndUpdate(
      { order: order._id, itemName: 'Milk' },
      { $inc: { price: 1 } },
      { 'new': true, session }
    );
    log(result2);
    
    await session.abortTransaction();
    

    对活动事务的任何操作都将从状态中删除并且不会应用。因此,它们对之后的结果操作不可见。在此处的示例中,文档中的值递增,并将在当前会话中显示检索到的值5。但是在session.abortTransaction() 之后,文档的先前状态被恢复。请注意,任何未在同一会话上读取数据的全局上下文,除非已提交,否则不会看到该状态更改。

    这应该给出一般概述。可以添加更多复杂性来处理不同级别的写入失败和重试,但这已经在文档和许多示例中广泛涵盖,或者可以回答更具体的问题。


    输出

    作为参考,包含列表的输出如下所示:

    Mongoose: orders.deleteMany({}, {})
    Mongoose: orderitems.deleteMany({}, {})
    Mongoose: orders.insertMany([ { _id: 5bf775986c7c1a61d12137dd, name: 'Bill', __v: 0 }, { _id: 5bf775986c7c1a61d12137de, name: 'Ted', __v: 0 } ], { session: ClientSession("80f827fe077044c8b6c0547b34605cb2") })
    Mongoose: orders.insertOne({ _id: ObjectId("5bf775986c7c1a61d12137df"), name: 'Fred', __v: 0 }, { session: ClientSession("80f827fe077044c8b6c0547b34605cb2") })
    Mongoose: orderitems.insertMany([ { _id: 5bf775986c7c1a61d12137e0, order: 5bf775986c7c1a61d12137dd, itemName: 'Cheese', price: 1, __v: 0 }, { _id: 5bf775986c7c1a61d12137e1, order: 5bf775986c7c1a61d12137dd, itemName: 'Bread', price: 2, __v: 0 }, { _id: 5bf775986c7c1a61d12137e2, order: 5bf775986c7c1a61d12137dd, itemName: 'Milk', price: 3, __v: 0 } ], { session: ClientSession("80f827fe077044c8b6c0547b34605cb2") })
    Mongoose: orderitems.updateOne({ order: ObjectId("5bf775986c7c1a61d12137dd"), itemName: 'Milk' }, { '$inc': { price: 1 } }, { session: ClientSession("80f827fe077044c8b6c0547b34605cb2") })
    {
      "n": 1,
      "nModified": 1,
      "opTime": {
        "ts": "6626894672394452998",
        "t": 139
      },
      "electionId": "7fffffff000000000000008b",
      "ok": 1,
      "operationTime": "6626894672394452998",
      "$clusterTime": {
        "clusterTime": "6626894672394452998",
        "signature": {
          "hash": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=",
          "keyId": 0
        }
      }
    }
    Mongoose: orderitems.findOneAndUpdate({ order: ObjectId("5bf775986c7c1a61d12137dd"), itemName: 'Milk' }, { '$inc': { price: 1 } }, { session: ClientSession("80f827fe077044c8b6c0547b34605cb2"), upsert: false, remove: false, projection: {}, returnOriginal: false })
    {
      "_id": "5bf775986c7c1a61d12137e2",
      "order": "5bf775986c7c1a61d12137dd",
      "itemName": "Milk",
      "price": 5,
      "__v": 0
    }
    Mongoose: orders.aggregate([ { '$match': { _id: 5bf775986c7c1a61d12137dd } }, { '$lookup': { from: 'orderitems', foreignField: 'order', localField: '_id', as: 'orderitems' } } ], {})
    [
      {
        "_id": "5bf775986c7c1a61d12137dd",
        "name": "Bill",
        "__v": 0,
        "orderitems": [
          {
            "_id": "5bf775986c7c1a61d12137e0",
            "order": "5bf775986c7c1a61d12137dd",
            "itemName": "Cheese",
            "price": 1,
            "__v": 0
          },
          {
            "_id": "5bf775986c7c1a61d12137e1",
            "order": "5bf775986c7c1a61d12137dd",
            "itemName": "Bread",
            "price": 2,
            "__v": 0
          },
          {
            "_id": "5bf775986c7c1a61d12137e2",
            "order": "5bf775986c7c1a61d12137dd",
            "itemName": "Milk",
            "price": 4,
            "__v": 0
          }
        ]
      }
    ]
    

    【讨论】:

      【解决方案2】:

      我认为这是开始使用 mongoose 执行事务的最快方式

          const mongoose = require("mongoose");
      
          // starting session on mongoose default connection
          const session = await mongoose.startSession();
          mongoose.connection.transaction(async function executor(session) {
            try {
              // creating 3 collections in isolation with atomicity
              const price = new Price(priceSchema);
              const variant = new Variant(variantSchema);
              const item = new Item(itemSchema);
             
              await price.save({ session });
              await variant.save({ session });
              // throw new Error("opps some error in transaction");  
              return await item.save({ session });
            } catch (err) {
              console.log(err);
            }
           });
      

      【讨论】:

        猜你喜欢
        • 2015-07-22
        • 2021-01-07
        • 1970-01-01
        • 2020-06-20
        • 2015-08-07
        • 2018-12-30
        • 2015-08-22
        • 2017-09-09
        相关资源
        最近更新 更多