【问题标题】:Firestore Concurrent Transactions FreezingFirestore 并发事务冻结
【发布时间】:2021-10-19 15:20:02
【问题描述】:

我遇到了 Firestore 事务在同时运行时冻结的问题。作为背景,我有一个 Firestore 集合,其中每个文档都有一个美元金额和一个时间。我正在创建一个函数,从这个集合中释放所需的美元金额,从最旧的文档开始。

例如,释放 $150 的函数调用将遍历集合,从集合中移除美元金额,直到移除总计 150 美元。我使用递归函数执行此操作,该函数 1) 找到最旧的美元金额,2) 从该数字中删除输入的金额(即 150 美元),或者如果输入的金额大于该数字,则删除该数字,以及 3) 如果存在则重复仍然是要删除的剩余金额。我在步骤 (2) 中使用 Firestore 事务,因为这个集合可能会被多个用户同时更改(请注意,如果我结合 (1) 和 (2) 以将查询包含在事务中,代码行为不变)。

下面的代码正确更新了集合。但是,如果在较早的实例已经运行时调用它,则需要很长时间:如果我调用它一次,然后在第一次调用完成之前再次调用它,它会冻结并有可能需要 20-30 分钟而不是通常的 1-5 秒。虽然很明显并发事务之间的争用导致了冻结(当我删除事务的写入部分时,没有冻结),但未知的问题是争用具体是什么导致了冻结以及如何解决它。

补充:看来这次冻结可能与https://github.com/firebase/firebase-tools/issues/2452有关。与那篇文章一致,我面临每笔交易冻结 30 秒的问题,鉴于单个版本有多个交易,这将变成很多分钟。

 function releaseAmountFromStack(amount) {
  return new Promise((resolve, reject) => {
    let db = admin.firestore();
    let stackRef = db.collection("stack");

    stackRef.orderBy("expirationTime", "asc").limit(1)
      .get().then((querySnapshot) => {
        if(querySnapshot.empty) {
          return reject("None left in stack");
        }

        let itemToRelease = querySnapshot.docs[0];

        releaseItem(itemToRelease.ref, amount)
        .then((actualReleaseAmount) => {
          // If there is still more to release, trigger the next recursion
          // If the full amount has been released, return it
          if (amount > actualReleaseAmount) {
            releaseAmountFromStack(amount-actualReleaseAmount)
            .then((nextActualReleaseAmount) => {
              return resolve(actualReleaseAmount + nextActualReleaseAmount);
            })
            .catch(() => {
              return resolve(actualReleaseAmount);
            });
          } else {
            return resolve(actualReleaseAmount);
          }
        });
    });
  });
}

function releaseItem(itemRef, amountToRelease) {
  let db = admin.firestore();
  return db.runTransaction((transaction) => {
    return transaction.get(itemRef).then((itemDoc) => {
      let itemAmount = itemDoc.data().amount;
      let actualReleaseAmount = Math.min(amountToRelease, itemAmount);

      // If item is exhausted, delete it. Else, update amount
      if (actualReleaseAmount >= itemAmount) {
        transaction.delete(itemDoc.ref);
      } else {
        transaction.set(itemDoc.ref, {
          amount: admin.firestore.FieldValue.increment(-1*Number(actualReleaseAmount)),
        }, {merge: true});
      }
      return actualReleaseAmount;
      });
  });
}

以下是迄今为止调试过程中的一些有用事实。非常感谢。

  • 在冻结期间,它不会在任何这些代码行上触发断点。只有当冻结完成时才会触发断点。这表明延迟不是由循环我的代码引起的(如果是,应该触发断点)
  • 该函数最终会按预期工作,因为它释放了正确的数量,只是需要很长时间。它通常会冻结,然后执行,然后冻结,然后执行,等等,直到该过程完成
  • Firestore 使用统计数据显示该函数执行数百次读取和写入,即使它只需要(而且我希望)迭代几十次以从集合中释放必要的数量

【问题讨论】:

  • 代码可能会产生竞争条件。
  • 谢谢。您认为哪个方面会导致竞态条件?根据我的测试,逻辑似乎可行,只是每笔交易需要 30 秒,原因不明。

标签: javascript firebase google-cloud-firestore transactions google-cloud-functions


【解决方案1】:

首先,让我们修复您的 releaseAmountFromStack 函数,这样您就不会使用 Promise 构造函数来返回 Promise 的 API(称为 Explicit Promise Construction Antipattern)。如果您的stackRef 查询或releaseItem 函数抛出错误,您的代码将遇到UnhandledPromiseRejection,因为Promise 链都没有catch 处理程序。

function releaseAmountFromStack(amount) {
  const db = admin.firestore();
  const stackRef = db.collection("stack");

  return stackRef
    .orderBy("expirationTime", "asc")
    .limit(1)
    .get()
    .then((querySnapshot) => {
      if(querySnapshot.empty) {
        return Promise.reject("Out of stock");
      }

      const itemToRelease = querySnapshot.docs[0];

      return releaseItem(itemToRelease.ref, amount);
    })
    .then((releasedAmount) => {
       const amountLeft = amount - releasedAmount;

       if (amountLeft <= 0)
         return releasedAmount;

       return releaseAmountFromStack(amountLeft)
         .then((nextReleasedAmount) => nextReleasedAmount + releasedAmount)
         .catch((err) => {
           if (err === "Out of stock") {
             return releasedAmount;
           } else {
             // rethrow unexpected errors
             throw err;
           }
         });
     });
}

由于该函数涉及嵌套的 Promise 链,因此切换到 async/await 语法可以将其扁平化为:

async function releaseAmountFromStack(amount) {
  const db = admin.firestore();
  const stackRef = db.collection("stack");

  const querySnapshot = await stackRef
    .orderBy("expirationTime", "asc")
    .limit(1)
    .get();
  
  if (querySnapshot.empty)
    return 0; // out of stock

  const itemToRelease = querySnapshot.docs[0];

  const releasedAmount = await releaseItem(itemToRelease.ref, amount);

  const amountLeft = amount - releasedAmount;
  
  if (amountLeft <= 0) {
    // nothing left to release, return released amount
    return releasedAmount; 
  }

  // If here, there is more to release, trigger the next recursion
  const nextReleasedAmount = await releaseAmountFromStack(amountLeft);

  return nextReleasedAmount + releasedAmount;
}

注意:在上面的扁平化版本中,我们不需要处理 catch,因为我们可以直接返回 0。这意味着任何不相关的错误都会正常抛出。

接下来我们可以转到releaseItem,这是您问题的真正原因。在这里,您没有处理另一个实例正在删除您正在阅读的项目的情况,您只是假设它存在,然后最终处理NaN 和/或负余额。作为最终发生的一个示例,您可以得到以下事件序列(它比这更复杂,因为您使用的是FieldValue.increment() - 添加一个服务器客户端也执行事务):

 Client A                      Server                      Client B
Release 50                                                Release 80
    ┌┐         Get Doc #1        ┌┐                           ┌┐
    ││     ────────────────►     ││                           ││
    ││                           ││                           ││
    ││                           ││                           ││
    ││      Doc #1 Snapshot      ││         Get Doc #1        ││
    ││       { amount: 10 }      ││     ◄────────────────     ││
    ││     ◄────────────────     ││                           ││
    ││                           ││                           ││
    ││                           ││      Doc #1 Snapshot      ││
    ││                           ││       { amount: 10 }      ││
    ││   Result: Delete Doc #1   ││     ────────────────►     ││
    ││     ────────────────►     ││                           ││
    ││          ACCEPTED         ││                           ││
    ││     ◄────────────────     ││                           ││
    ││                           ││   Result: Delete Doc #1   ││
    ││         Get Doc #2        ││     ◄────────────────     ││
    ││     ────────────────►     ││                           ││
    ││                           ││         REJECTED          ││
    ││                           ││    New Doc #1 Snapshot    ││
    ││                           ││          <null>           ││
    ││      Doc #2 Snapshot      ││     ────────────────►     ││
    ││      { amount: 15 }       ││                           ││
    ││     ◄────────────────     ││                           ││
    ││                           ││                           ││
    ││                           ││ Result: Set Doc #1 to -10 ││
    ││                           ││     ◄────────────────     ││
    ││   Result: Delete Doc #2   ││             ▲             ││
    ││     ────────────────►     ││             │             ││
    ││          ACCEPTED         ││           ERROR           ││
    ││     ◄────────────────     ││                           ││
    └┘                           └┘                           └┘

因为您正在使用交易,您已经知道amount 的当前值,因此您可以在客户端上进行计算并写入新金额,而不是使用FieldValue.increment()。该运算符更适合对不需要知道当前值的值进行简单更新。

function releaseItem(itemRef, amountToRelease) {
  const db = admin.firestore();
  return db.runTransaction((transaction) => {
    return transaction.get(itemRef).then((itemDoc) => {
      if (!itemDoc.exists) {
        // target has been deleted, do nothing & return
        // amount that was released (0)
        return 0;
      }
  
      const itemAmount = itemDoc.get("amount");
      const actualReleaseAmount = Math.min(amountToRelease, itemAmount);

      if (actualReleaseAmount >= itemAmount) {
        // exhausted supply. delete item
        transaction.delete(itemDoc.ref);
      } else {
        // have leftover supply. update amount
        transaction.set(itemDoc.ref, {
          amount: itemAmount - actualReleaseAmount, 
        }, {merge: true});
      }

      // return amount that was released
      return actualReleaseAmount;
    });
  });
}

这不会完全解决您的问题,因为如果两个或多个客户端尝试同时从同一个堆栈中取出项目,您仍然会遇到数据争用问题。作为这方面的一个示例,请参阅以下事件流程(再次,时间将成为事情如何发展的主要因素):

 Client A                      Server                      Client B
Release 50                                                Release 80
    ┌┐         Get Doc #1        ┌┐                           ┌┐
    ││     ────────────────►     ││                           ││
    ││                           ││                           ││
    ││                           ││                           ││
    ││      Doc #1 Snapshot      ││         Get Doc #1        ││
    ││       { amount: 10 }      ││     ◄────────────────     ││
    ││     ◄────────────────     ││                           ││
    ││                           ││                           ││
    ││                           ││      Doc #1 Snapshot      ││
    ││                           ││       { amount: 10 }      ││
    ││   Result: Delete Doc #1   ││     ────────────────►     ││
    ││     ────────────────►     ││                           ││
    ││          ACCEPTED         ││                           ││
    ││     ◄────────────────     ││                           ││
    ││                           ││   Result: Delete Doc #1   ││
    ││         Get Doc #2        ││     ◄────────────────     ││
    ││     ────────────────►     ││                           ││
    ││                           ││         REJECTED          ││
    ││                           ││    New Doc #1 Snapshot    ││
    ││                           ││          <null>           ││
    ││      Doc #2 Snapshot      ││     ────────────────►     ││
    ││      { amount: 15 }       ││                           ││
    ││     ◄────────────────     ││     Result: Cancelled     ││
    ││                           ││     ◄────────────────     ││
    ││                           ││                           ││
    ││                           ││                           ││
    ││   Result: Delete Doc #2   ││         Get Doc #2        ││
    ││     ────────────────►     ││     ◄────────────────     ││
    ││          ACCEPTED         ││                           ││
    ││     ◄────────────────     ││                           ││
    ││                           ││      Doc #2 Snapshot      ││
    ││         Get Doc #3        ││          <null>           ││
    ││     ────────────────►     ││     ────────────────►     ││
    ││                           ││                           ││
    ││                           ││     Result: Cancelled     ││
    ││                           ││     ◄────────────────     ││
    ││      Doc #3 Snapshot      ││                           ││
    ││      { amount: 10 }       ││                           ││
    ││     ◄────────────────     ││         Get Doc #3        ││
    ││                           ││     ◄────────────────     ││
    ││                           ││                           ││
    ││                           ││                           ││
    ││   Result: Delete Doc #3   ││      Doc #3 Snapshot      ││
    ││     ────────────────►     ││          <null>           ││
    ││          ACCEPTED         ││     ────────────────►     ││
    ││     ◄────────────────     ││                           ││
    ││                           ││     Result: Cancelled     ││
    ││         Get Doc #4        ││     ◄────────────────     ││
    └┘     ────────────────►     └┘                           └┘

解决此问题的一种方法是一次拉下多个项目,适当地使用它们并将更改写回,所有这些都在一个事务中。再说一次,不处理问题,只是减少客户一遍又一遍地争夺相同文件的可能性。

async function releaseAmountFromStack(amount) {
  const db = admin.firestore();
  const stackQuery = db
    .collection("stack")
    .orderBy("expirationTime", "asc")
    .limit(1);

  const releasedAmount = await _releaseAmountFromStack(stackQuery, amount);

  const amountLeft = amount - releasedAmount;

  if (amountLeft <= 0) {
    // nothing left to release, return released amount
    return releasedAmount; 
  }

  // If here, there is more to release, trigger the next recursion
  const nextReleasedAmount = await releaseAmountFromStack(amountLeft)
    .catch((err) => {
      if (err !== "Empty stack") throw err;
      return 0;
    });

  return nextReleasedAmount + releasedAmount;
}

function _releaseAmountFromStack(query, amountToRelease) => {
  return db.runTransaction((transaction) => {
    return transaction.get(query).then((querySnapshot) => {
      if (!querySnapshot.empty) {
        // nothing in stack, return released amount (0)
        return Promise.reject("Empty stack");
      }

      let remainingAmountToRelease = amountToRelease;

      for (const doc of querySnapshot.docs) {
        const itemAmount = doc.get("amount");
        const amountChange = Math.min(itemAmount, remainingAmountToRelease);

        if (amountChange >= itemAmount) {
          transaction.delete(doc.ref);
        } else {
          remainingAmountToRelease -= amountChange;
          transaction.set(doc.ref, {
            amount: itemAmount - amountChange
          }, { merge: true });
        }

        if (remainingAmountToRelease <= 0) break; // stop iterating early
      }
  
      // return amount that was released
      return /* totalAmountReleased = */ amountToRelease - remainingAmountToRelease;
    });
  });
}

【讨论】:

  • 感谢 Sam 的彻底回复。但是,在我进行了您提到的更改之后,虽然这确实使代码样式更清晰,但它不会影响争用冻结。正如您所列出的那样,存在争用,但我仍然不清楚为什么这种争用会导致冻结 - 正如您的示例所示,它会在一些争用之后正确执行(这也与我的日志一致 - 它执行为预期的,但需要几分钟,目前尚不清楚是什么导致了如此长的运行时间)。您是否知道可能导致潜在问题的原因是什么?
【解决方案2】:

我找出了导致难以捉摸的冻结的原因。我注意到每笔交易的冻结时间都是 30 秒,这促使我搜索了关于 30 秒交易冻结的讨论。我发现 (https://github.com/firebase/firebase-tools/issues/2452) 表明这是 Firebase 模拟器如何处理并发事务的问题。确实,当我部署代码而不是使用模拟器时,冻结不再存在!

TLDR:Firebase 模拟器对并发事务有不自然的延迟。

【讨论】:

    猜你喜欢
    • 2014-11-16
    • 1970-01-01
    • 2015-01-10
    • 2015-06-11
    • 1970-01-01
    • 2020-07-17
    • 2011-07-05
    • 2016-11-03
    • 1970-01-01
    相关资源
    最近更新 更多