【问题标题】:Prevent concurrent processing in NodeJS防止 NodeJS 中的并发处理
【发布时间】:2018-10-08 23:17:11
【问题描述】:

我需要 NodeJS 来防止相同请求的并发操作。据我了解,如果 NodeJS 收到多个请求,会发生这种情况:

REQUEST1 ---> DATABASE_READ
REQUEST2 ---> DATABASE_READ
DATABASE_READ complete ---> EXPENSIVE_OP() --> REQUEST1_END
DATABASE_READ complete ---> EXPENSIVE_OP() --> REQUEST2_END

这会导致运行两个昂贵的操作。我需要的是这样的:

REQUEST1 ---> DATABASE_READ
DATABASE_READ complete ---> DATABASE_UPDATE
DATABASE_UPDATE complete ---> REQUEST2 ---> DATABASE_READ ––> REQUEST2_END
                         ---> EXPENSIVE_OP() --> REQUEST1_END

这就是它在代码中的样子。问题是应用程序开始读取缓存值和完成写入之间的窗口。在此窗口期间,并发请求不知道已经有一个具有相同 itemID 的请求在运行。

app.post("/api", async function(req, res) {
    const itemID = req.body.itemID

    // See if itemID is processing
    const processing = await DATABASE_READ(itemID)
    // Due to how NodeJS works, 
    // from this point in time all requests
    // to /api?itemID="xxx" will have processing = false 
    // and will conduct expensive operations

    if (processing == true) {
        // "Cheap" part
        // Tell client to wait until itemID is processed
    } else {
        // "Expensive" part
        DATABASE_UPDATE({[itemID]: true})
        // All requests to /api at this point
        // are still going here and conducting 
        // duplicate operations.
        // Only after DATABASE_UPDATE finishes, 
        // all requests go to the "Cheap" part
        DO_EXPENSIVE_THINGS();
    }
}

编辑

当然我可以这样做:

const lockedIDs = {}
app.post("/api", function(req, res) {
    const itemID = req.body.itemID
    const locked = lockedIDs[itemID] ? true : false // sync equivalent to async DATABASE_READ(itemID)
    if (locked) {
        // Tell client to wait until itemID is processed
        // No need to do expensive operations
    } else {
        lockedIDs[itemID] = true // sync equivalent to async DATABASE_UPDATE({[itemID]: true})
        // Do expensive operations
        // itemID is now "locked", so subsequent request will not go here
    }
}

lockedIDs 这里的行为类似于内存中的同步键值数据库。没关系,如果它只是一台服务器。但是如果有多个服务器实例呢?我需要有一个单独的缓存存储,比如 Redis。而且我只能异步访问 Redis。所以这行不通,很遗憾。

【问题讨论】:

  • 为什么要更新数据库作为获取请求的一部分?此外,您所要求的将大大降低您的应用程序性能和可伸缩性。我 99% 确定您实际上并不想要您认为自己想要的东西。
  • 数据库是指 Redis 缓存。我基本上想将 itemID 设置为{processing: true},这样后续的请求就会知道这个操作已经在处理中,他们不需要自己处理。他们只会等到一个进程完成,然后他们才能在数据库中找到输出。
  • 您还应该注意,您不应该让第二个请求等待第一个请求。您最终会遇到网关超时等问题。最好的办法是返回带有{process_1_done: false} 类消息的第二个请求,并让前端继续检查。
  • @nbwoodward 这就是我要解决的问题...第二个请求如何知道 process_1 已启动(阶段 0)、正在处理(阶段 1)或已完成(阶段2)?我试图通过保存processing = true 在第一个请求中做到这一点,因此当发出第二个请求时,它可以访问该值并知道 process_1 处于第 1 阶段。
  • 我仍然不明白您为什么要使用获取请求启动更新过程。无论它是哪个过程或您在哪里处理它。

标签: node.js express


【解决方案1】:

好的,让我来看看这个。

所以,我在这个问题上遇到的问题是,您已经将问题抽象得如此之多,以至于很难帮助您进行优化。目前尚不清楚您的“长时间运行的进程”在做什么,它在做什么将影响如何解决处理多个并发请求的挑战。您担心消耗资源的 API 在做什么?

从您的代码中,起初我猜想您正在开始某种长期运行的工作(例如文件转换或其他东西),但后来一些编辑和 cmets 让我认为这可能只是一个复杂的工作查询需要大量计算才能正确的数据库,因此您希望缓存查询结果。但我也可以看到它是其他东西,例如针对您正在聚合的一堆第三方 API 的查询或其他东西。每个场景都有一些细微差别,会改变最佳状态。

也就是说,我将解释“缓存”方案,如果您对其他解决方案之一更感兴趣,可以告诉我。

基本上,您已经在缓存的正确范围内。如果您还没有,我建议您查看cache-manager,它可以为这些场景稍微简化您的样板文件(让您设置缓存失效,甚至拥有多层缓存)。您缺少的部分是您基本上应该始终响应缓存中的任何内容,并在任何给定请求的范围之外填充缓存。使用您的代码作为起点,类似这样(为简单起见,省略所有 try..catch 和错误检查等):

// A GET is OK here, because no matter what we're firing back a response quickly, 
//      and semantically this is a query
app.get("/api", async function(req, res) {
    const itemID = req.query.itemID

    // In this case, I'm assuming you have a cache object that basically gets whatever
    //    is cached in your cache storage and can set new things there too.  
    let item = await cache.get(itemID)

    // Item isn't in the cache at all, so this is the very first attempt.  
    if (!item) {
        // go ahead and let the client know we'll get to it later. 202 Accepted should 
        //   be fine, but pick your own status code to let them know it's in process. 
        //   Other good options include [503 Service Unavailable with a retry-after 
        //   header][2] and [420 Enhance Your Calm][2] (non-standard, but funny)
        res.status(202).send({ id: itemID });

        // put an empty object in there so we know it's working on it. 
        await cache.set(itemID, {}); 

        // start the long-running process, which should update the cache when it's done
        await populateCache(itemID); 
        return;
    }
    // Here we have an item in the cache, but it's not done processing.  Maybe you 
    //     could just check to see if it's an empty object or not, but I'm assuming 
    //     that we've setup a boolean flag on the cached object for when it's done.
    if (!item.processed) {
        // The client should try again later like above.  Exit early. You could 
        //    alternatively send the partial item, an empty object, or a message. 
       return res.status(202).send({ id: itemID });
    } 

    // if we get here, the item is in the cache and done processing. 
    return res.send(item);
}

现在,我不知道你所有的东西到底是做什么的,但如果是我,上面的populateCache 是一个非常简单的函数,它只调用我们用来做长期工作的任何服务,然后将其放入缓存中。

async function populateCache(itemId) {
   const item = await service.createThisWorkOfArt(itemId);
   await cache.set(itemId, item); 
   return; 
}

如果不清楚,或者您的情况与我的猜测确实不同,请告诉我。

如 cmets 中所述,这种方法将涵盖您描述的场景中可能遇到的大多数正常问题,但它仍然允许两个请求都触发长时间运行的进程,如果它们的进入速度比写入速度快您的缓存存储(例如 Redis)。我认为发生这种情况的可能性非常低,但如果您真的对此感到担忧,那么下一个更加偏执的版本将是简单地从您的 Web API 中完全删除长时间运行的进程代码。相反,您的 API 只记录有人请求发生这些事情,如果缓存中没有任何内容,则像我上面所做的那样响应,但完全删除实际调用 populateCache 的块。

相反,您将运行一个单独的工作进程,该进程将定期(多久取决于您的业务案例)检查缓存中是否存在未处理的作业并启动处理它们的工作。通过这种方式,即使您对同一项目有 1000 个并发请求,您也可以确保只处理一次。当然,缺点是您将检查的周期性添加到延迟获取完全处理的数据中。

【讨论】:

  • 感谢您的帮助!该场景是一种组合——大量第三方 API 请求、计算和复杂(但增量)的聚合查询。如果有多个并发请求,您的代码中的什么内容会阻止服务多次运行?
  • 好吧,这种情况并不能保护你免受真正的并发请求(比如在完全相同的纳秒),但坦率地说,如果不将自己限制在一个进程中,这是不可能做到的,比如我之前说过。但是,这种情况可以很好地防止快速请求,特别是如果您的缓存写入和读取速度非常快,就像使用 Redis 或 Memcached 一样。正如我提到的,一个小的改进是做一个多层缓存,其中一个是内存缓存,但这仅适用于针对同一进程的多个请求。
  • 好的,@lukas 添加了更多内容,让自己更加偏执于不多次执行相同的工作。
【解决方案2】:

您可以创建一个本地 Map 对象(在内存中用于同步访问),其中包含任何 itemID 作为正在处理的键。您可以使该键的值成为一个承诺,无论结果来自以前处理过该键的任何人的任何结果,都可以解决。我认为这就像一个看门人。它会跟踪正在处理的 itemID。

此方案告诉未来对相同 itemID 的请求等待并且不会阻止其他请求 - 我认为这很重要,而不仅仅是对与 itemID 处理相关的所有请求使用全局锁定。

然后,作为处理的一部分,您首先检查本地 Map 对象。如果该密钥在那里,那么它当前正在处理中。然后,您可以等待来自 Map 对象的承诺,以查看何时完成处理并从之前的处理中获得任何结果。

如果它不在 Map 对象中,那么它现在没有被处理,您可以立即将它放入 Map 以将其标记为“处理中”。如果您将一个 Promise 设置为值,那么您可以使用从该对象处理中获得的任何结果来解决该 Promise。

出现的任何其他请求最终都会等待该承诺,因此您将只处理此 ID 一次。以该 ID 开头的第一个将处理它,并且在处理时出现的所有其他请求将使用相同的共享结果(从而节省您繁重计算的重复)。

我试图编写一个示例,但并没有真正理解您的伪代码试图做得足够好以提供代码示例。

这样的系统必须具有完美的错误处理,以便所有可能的错误路径都能正确处理Map 和嵌入在Map 中的承诺。

根据您相当简单的伪代码示例,这里有一个类似的伪代码示例来说明上述概念:

const itemInProcessCache = new Map();

app.get("/api", async function(req, res) {
    const itemID = req.query.itemID
    let gate = itemInProcessCache.get(itemID);
    if (gate) {
        gate.then(val => {
            // use cached result here from previous processing
        }).catch(err => {
            // decide what to do when previous processing had an error
        });
    } else {
        let p = DATABASE_UPDATE({itemID: true}).then(result => {
            // expensive processing done
            // return final value so any others waiting on the gate can just use that value
            // decide if you want to clear this item from itemInProcessCache or not
        }).catch(err => {
            // error on expensive processing

            // remove from the gate cache because we didn't get a result
            // expensive processing will have to be done by someone else
            itemInProcessCache.delete(itemID);
        });
        // mark this item as being processed
        itemInProcessCache.set(itemID, p);
    }
});

注意:这依赖于 node.js 的单线程性。在此处的请求处理程序返回之前,任何其他请求都无法启动,以便在此 itemID 的任何其他请求开始之前调用itemInProcessCache.set(itemID, p);


另外,我不太了解数据库,但这似乎很像一个好的多用户数据库可能已经内置的功能,或者具有使这更容易的支持功能,因为不想要的想法并不罕见让多个请求都尝试做相同的数据库工作(或者更糟糕的是,相互竞争)。

【讨论】:

  • 这只有在 OP 可以确定他不需要运行多个节点实例(例如通过集群或添加服务器)时才有效。我没有为它 -1,但这是一个很大的限制。
  • 正如@Paul 所说,我用 EDIT 更新了问题,希望问题更清楚。
  • @Paul - 是的,这是真的。您将不得不为多个实例使用共享存储,这会使事情变得复杂,因为对它的异步访问。到那时,应该在数据库中查找此类功能。
  • @lukas - 在你原来的问题中,你没有说过你试图为多个 node.js 实例解决这个问题。现在完全不同的问题。我想我很抱歉我花了任何时间来回答问题。去找一个内置这种能力的数据库。这正是你想要的。
  • @lukas - 正如我在回答中所说,我不太了解数据库的这方面,但我知道这是某些数据库的一个功能领域,因为并发管理对于数据库。我建议你开始阅读一些文章here
猜你喜欢
  • 2020-04-06
  • 1970-01-01
  • 1970-01-01
  • 2020-11-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多