首先 - 让我们展示一个实用的解决方案,然后深入了解它的工作原理和原因:
var chain = Promise.resolve(); // Create a resolved promise
var fs = Promise.promisifyAll(require("fs"));
chain = chain.then(function(){
return fs.writeAsync(...); // A
});
// some time in the future
chain = chain.then(function(){
return fs.writeAsync(...); // This will always execute after A is done
})
既然你已经用 Promise 标记了你的问题 - 值得一提的是,Promise 可以自己很好地解决这个(相当复杂的)问题,而且很容易做到。
您的数据同步问题称为the producer consumer 问题。有很多方法可以解决 JavaScript 中的同步问题 - this recent piece by Q's KrisKowal 是该主题的好读物。
输入:承诺
使用 Promise 解决问题的最简单方法是通过单个 Promise 链接所有内容。我知道你自己对 promise 很有经验,但对于新读者,让我们回顾一下:
Promises 是对排序本身的概念的抽象。承诺是单个(读取离散的)动作单元。链接承诺,很像某些语言中的;,记录一个操作的结束和下一个操作的开始。 JavaScript 中的 Promise 抽象了两个主要内容 - 动作需要时间和异常条件的概念。
这里有一个“更高”的抽象,称为 monad,而 A+ 承诺不严格遵守 monad 法则(为了方便),有一些承诺的实现可以做到。 Promise 抽象了某种处理,而 monad 抽象了处理本身的概念,您可以说 promise 是 monad 或至少它们是 monadic。
Promises 以 pending 开始,这意味着它们代表一个已经开始但尚未完成的操作。在某个时候,他们可能会经历解决,在此期间他们安顿处于以下两种状态之一:
-
已完成 - 表示操作已成功完成。
-
Rejected - 表示操作未成功完成。
一旦一个承诺被解决,它就不能再改变它的状态。就像您可以在下一行继续 ; 一样 - 您可以使用 .then 关键字继续承诺,它将上一个操作链接到下一个操作。
解决生产者-消费者。
生产者/消费者问题的传统解决方案可以通过像 Dijkstra 的信号量这样的传统并发结构来完成。事实上,这样的解决方案是通过承诺或简单的回调实现的,但我相信我们可以做类似的事情。
相反,我们将保持程序运行,并每次都向其附加新操作。
var fsQueue = Promise.resolve(); // start a new chain
// one place
fsQueue = fsQueue.then(function(){ // assuming promisified fs here
return fs.writeAsync(...);
});
// some other place
fsQueue = fsQueue.then(function(){
return fs.writeAsync(...);
});
向队列中添加动作可确保我们已订购同步,并且动作只会在较早的动作完成后执行。这是解决此问题的最简单的同步解决方案,需要通过 .then 将 fs.asyncFunction 调用包装到您的队列中。
另一种解决方案是使用类似于“监视器”的东西 - 我们可以通过包装 fs 从内部确保访问是一致的:
var fs = B.promisifyAll(require("fs")); // bluebird promisified fs
var syncFs = { // sync stands for synchronized, not synchronous
queue: B.resolve();
writeAsync = function(){
var args = arguments
return (queue = queue.then( // only execute later
return fs.writeAsync.apply(fs,arguments);
});
} // promisify other used functions similarly
};
这将产生同步版本的 fs 操作。也可以使用类似的方法自动执行此操作(尚未测试):
// assumes module is promisified and ignores nested functions
function synchronize(module){
var ret = {}, queue = B.resolve();
for(var fn in module){
ret[fn] = function(){
var args = arguments;
queue = queue.then(function(){
return module[fn].apply(module, args);
})
};
}
ret.queue = queue; // expose the queue for handling errors
return ret;
}
这应该产生一个同步其所有动作的模块版本。请注意,我们获得了额外的好处,即错误不会被抑制并且文件系统不会处于不一致状态,因为在导致操作不执行的错误得到处理之前,操作不会被执行。
那是不是有点类似于队列?
是的!队列通过为操作提供先进先出结构来做非常相似的事情(您可以在另一个答案中看到)。很像以该顺序开始执行的程序代码。在我看来,承诺只是同一枚硬币更强大的一面。
另一个答案也通过队列提供了一个可行的选择。
关于您建议的方法
切换到同步 IO 以将数据写入磁盘(出于服务器响应的原因,不想这样做)。
虽然我同意这是最简单的 - 将您需要在同一队列上同步的所有操作链接起来的“监控”方法非常相似。
在我开始写入数据时设置一个标志,并且在设置该标志时不记录任何新数据(导致我在写入过程中丢失数据记录)。
那个标志实际上是一个互斥体。如果您在有人重试时阻止(或让步并将操作放入队列),那么您就有了一个真正的互斥锁,它拥有“互斥保证”。
使用该标志重试,并保留一个包含该标志的下一步操作列表实际上在信号量的实现中非常常见 - 一个例子是在 linux 内核中。
选项 2 的更复杂版本,我设置了标志,当设置标志时,新数据进入一个单独的临时数据结构,当文件 IO 完成后,它与真实数据合并(可行,但似乎丑陋的)。获取原始数据的快照副本,并花时间将该副本写入磁盘,因为知道没有其他人会修改该副本。我不想这样做,因为数据集比较大,而且我处于内存有限的环境(Raspberry PI)中。
这些方法通常被称为事务性 RCU 更新,在某些情况下它们实际上非常现代且非常快速 - 例如对于“读写器问题”(这与您所拥有的非常相似)。对这些的本地支持最近在 linux 内核中启动。在某些情况下这样做实际上既可行又高效,尽管在你的情况下有点像你建议的那样过于复杂。
所以,总结一下
- 这不是一个简单的问题,而是一个有趣的问题。
- 幸运的是,promise 很好地解决了这个问题,它们的构建正是为了通过抽象序列的概念来解决这类问题。
快乐的编码,Pi NodeJS 项目听起来很棒。如果我能进一步澄清这一点,请告诉我。