【问题标题】:How to write a live data set to disk with async I/O?如何使用异步 I/O 将实时数据集写入磁盘?
【发布时间】:2014-10-30 23:35:36
【问题描述】:

我是使用 node.js 开发的新手(尽管在客户端 javascript 方面经验相对丰富),并且在处理 node.js 中的异步操作时遇到了很多关于良好实践的问题。

我的具体问题(尽管我认为这是一个相当通用的主题)是我有一个 node.js 应用程序(在 Raspberry Pi 上运行),它每 10 秒将多个温度探测器的读数记录到内存中数据结构。这工作得很好。数据随着时间的推移在内存中累积,当它累积并达到特定大小阈值时,数据会定期老化(仅保留最后 N 天的数据)以防止其增长超过特定大小。此温度数据用于控制其他一些电器。

然后,我有一个单独的间隔计时器,它会每隔一段时间将这些数据写入磁盘(如果进程崩溃,则将其保留)。我正在使用异步 node.js(fs.open()fs.write()fs.close())磁盘 IO 将数据写入磁盘。

而且,由于磁盘 IO 的异步特性,我突然想到,我正尝试写入磁盘的数据结构可能会在我将其写入磁盘的过程中被修改。那可能是一件坏事。如果数据仅在写入磁盘时附加到数据结构中,那实际上不会导致我写入数据的方式出现问题,但在某些情况下,可以在记录新数据时修改早期数据这真的会破坏我正在写入磁盘的完整性。

我能想到可以在我的代码中添加的各种有点丑陋的保护措施,例如:

  1. 切换到同步 IO 以将数据写入磁盘(出于服务器响应的原因,不想这样做)。
  2. 在我开始写入数据时设置一个标志,并且在设置该标志时不记录任何新数据(导致我在写入期间丢失数据记录)。
  3. 选项 2 的更复杂版本,其中我设置了标志,当设置标志时,新数据进入一个单独的临时数据结构,当文件 IO 完成后,它与真实数据合并(可行,但似乎丑)。
  4. 获取原始数据的快照副本并花时间将该副本写入磁盘,因为知道没有其他人会修改该副本。我不想这样做,因为数据集比较大,而且我处于内存有限的环境(Raspberry PI)中。

所以,我的问题是,当其他操作可能想要在异步 IO 期间修改该数据时,使用异步 IO 编写大型数据集的设计模式是什么?除了上面列出的具体解决方法之外,是否还有更通用的方法来处理我的问题?

【问题讨论】:

  • 也许这在一定程度上有所帮助? stackoverflow.com/q/14795145/218196
  • @FelixKling - 那篇文章看起来很好地描述了 node.js 中的异步操作如何工作。我已经明白了。事实上,正是我对异步架构的理解告诉我我有一个并发问题要处理,这就是为什么我正在寻找解决该并发问题的良好实践设计模式。
  • 据我了解node还是单线程的,并发执行并不一定意味着并行执行,所以在读写数据应该不会有问题同时(因为它不可能发生,就像在浏览器中一样)。但这些都只是猜测,我其实也不确定,所以不要听我的(太多):)
  • 为什么人们投票结束这个问题?我在问如何使用异步 IO 编写实时数据。
  • @Bergi - 是的,我正在分块写入数据。出于内存使用的原因(这是一个没有大量 RAM 的小型 Raspberry Pi),我不想一次将其全部序列化到内存中。在异步写入之间可能会发生额外的传感器读取,我实际上已经在我的日志中看到了这种情况。

标签: javascript node.js asynchronous raspberry-pi promise


【解决方案1】:

您的问题是data synchronization。传统上这是通过locks/mutexes 解决的,但javascript/node 并没有真正内置的任何东西。

那么,我们如何在节点中解决这个问题? 我们使用队列。就我个人而言,我使用来自async modulequeue 函数。

队列的工作方式是保留需要执行的任务列表,并且仅在前一个任务完成后按照它们添加到队列的顺序执行这些任务(类似于您的选项 3)。

注意: async 模块的 queue 方法实际上可以同时运行多个任务(如上面的动画所示),但是由于我们在这里讨论数据同步,我们不希望这样.幸运的是,我们可以告诉它一次只运行一个。

在您的特定情况下,您需要设置一个可以执行两种任务的队列:

  1. 修改数据结构
  2. 将数据结构写入磁盘

每当您从温度探头获得新数据时,将任务添加到队列中,以使用新数据修改数据结构。然后,每当您的间隔计时器触发时,将任务添加到将数据结构写入磁盘的队列中。

由于队列一次只能运行一项任务,因此按照将它们添加到队列中的顺序,它可以保证您在将数据写入磁盘时永远不会修改内存中的数据结构。

一个非常简单的实现可能如下所示:

var dataQueue = async.queue(function(task, callback) {
    if (task.type === "newData") {
        memoryStore.add(task.data); // modify your data structure however you do it now
        callback(); // let the queue know the task is done; you can pass an error here as usual if needed
    } else if (task.type === "writeData") {
        fs.writeFile(task.filename, JSON.stringify(memoryStore), function(err) {
            // error handling
            callback(err); // let the queue know the task is done
        })
    } else {
        callback(new Error("Unknown Task")); // just in case we get a task we don't know about
    }
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

// call when you get new probe data
funcion addNewData(data) {
    dataQueue.push({task: "newData", data: data}, function(err) {
        // called when the task is complete; optional
    });
}

// write to disk every 5 minutes
setInterval(function() {
    dataQueue.push({task: "writeData", filename: "somefile.dat"}, function(err) {
        // called when the task is complete; optional
    });
}, 18000);

还请注意,您现在可以将数据添加异步到您的数据结构中。假设您添加了一个新探针,该探针在其值更改时触发事件。您可以像使用现有探针一样addNewData(data),而不必担心它与正在进行的修改或磁盘写入冲突(如果您开始写入数据库而不是内存数据存储,这真的会发挥作用) .


更新:使用bind()的更优雅的实现

这个想法是您使用bind() 将参数绑定到函数,然后将bind() 返回的新绑定函数推送到队列中。这样你就不需要将一些自定义对象推送到它必须解释的队列中;你可以给它一个调用函数,所有设置都已经有了正确的参数。 唯一需要注意的是该函数必须将回调作为其最后一个参数。

这应该允许您使用您拥有的所有现有功能(可能稍作修改)并在需要确保它们不会同时运行时将它们推送到队列中。

我把这个放在一起来测试这个概念:

var async = require('async');

var dataQueue = async.queue(function(task, callback) {
    // task is just a function that takes a callback; call it
    task(callback); 
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

function storeData(data, callback) {
    setTimeout(function() { // simulate async op
        console.log('store', data);
        callback(); // let the queue know the task is done
    }, 50);
}

function writeToDisk(filename, callback) {
    setTimeout(function() { // simulate async op
        console.log('write', filename);
        callback(); // let the queue know the task is done
    }, 250);
}

// store data every second
setInterval(function() {
    var data = {date: Date.now()}
    var boundStoreData = storeData.bind(null, data);
    dataQueue.push(boundStoreData, function(err) {
        console.log('store complete', data.date);
    })
}, 1000)

// write to disk every 2 seconds
setInterval(function() {
    var filename = Date.now() + ".dat"
    var boundWriteToDisk = writeToDisk.bind(null, filename);
    dataQueue.push(boundWriteToDisk, function(err) {
        console.log('write complete', filename);
    });
}, 2000);

【讨论】:

  • 这个方案和方案3没有什么不同。我们还是得想办法把从队列中传入的数据拼接到数据结构中,这样会变得很乱(我们不只是处理插入。 ..我们也在处理删除)。
  • @JohnKurlak - 我认为这可行。当我有新数据要添加到数据结构中时,我会在数据队列中放入一个项目(操作 ID 和数据)。如果当时没有其他事情发生,队列管理器只会运行操作(例如调用函数),然后立即将数据添加到本地存储中。如果当时正在进行保存操作,则排队的项目位于队列中。保存操作完成后,队列管理器可以将下一项从队列中拉出,并运行与保存操作未运行时完全相同的代码。
  • 使用.bind() 似乎是一个很好的简化改进。
  • 我实现了队列。实际上很简单,而且看起来效果很好。如果数据被锁定,并且调用了可以修改数据的函数之一,则这些方法中的每一个都在本地函数中实现(通过闭包自动访问函数参数),因此我执行该本地函数,如果数据未锁定,或者如果是,我将其排队。实现它最困难的部分是制定一个测试,该测试会导致(相对较低频率的)并发冲突经常发生,足以让您测试您编写的代码。
【解决方案2】:

首先 - 让我们展示一个实用的解决方案,然后深入了解它的工作原理和原因:

var chain = Promise.resolve(); // Create a resolved promise
var fs = Promise.promisifyAll(require("fs"));

chain = chain.then(function(){
    return fs.writeAsync(...); // A
});

// some time in the future
chain = chain.then(function(){
    return fs.writeAsync(...); // This will always execute after A is done
})

既然你已经用 Promise 标记了你的问题 - 值得一提的是,Promise 可以自己很好地解决这个(相当复杂的)问题,而且很容易做到。

您的数据同步问题称为the producer consumer 问题。有很多方法可以解决 JavaScript 中的同步问题 - this recent piece by Q's KrisKowal 是该主题的好读物。

输入:承诺

使用 Promise 解决问题的最简单方法是通过单个 Promise 链接所有内容。我知道你自己对 promise 很有经验,但对于新读者,让我们回顾一下:

Promises 是对排序本身的概念的抽象。承诺是单个(读取离散的)动作单元。链接承诺,很像某些语言中的;,记录一个操作的结束和下一个操作的开始。 JavaScript 中的 Promise 抽象了两个主要内容 - 动作需要时间和异常条件的概念。

这里有一个“更高”的抽象,称为 monad,而 A+ 承诺不严格遵守 monad 法则(为了方便),有一些承诺的实现可以做到。 Promise 抽象了某种处理,而 monad 抽象了处理本身的概念,您可以说 promise 是 monad 或至少它们是 monadic

Promises 以 pending 开始,这意味着它们代表一个已经开始但尚未完成的操作。在某个时候,他们可能会经历解决,在此期间他们安顿处于以下两种状态之一:

  • 已完成 - 表示操作已成功完成。
  • Rejected - 表示操作未成功完成。

一旦一个承诺被解决,它就不能再改变它的状态。就像您可以在下一行继续 ; 一样 - 您可以使用 .then 关键字继续承诺,它将上一个操作链接到下一个操作。

解决生产者-消费者。

生产者/消费者问题的传统解决方案可以通过像 Dijkstra 的信号量这样的传统并发结构来完成。事实上,这样的解决方案是通过承诺或简单的回调实现的,但我相信我们可以做类似的事情。

相反,我们将保持程序运行,并每次都向其附加新操作。

var fsQueue = Promise.resolve(); // start a new chain

// one place
fsQueue = fsQueue.then(function(){ // assuming promisified fs here
    return fs.writeAsync(...); 
});

// some other place
fsQueue = fsQueue.then(function(){
    return fs.writeAsync(...);
});

向队列中添加动作可确保我们已订购同步,并且动作只会在较早的动作完成后执行。这是解决此问题的最简单的同步解决方案,需要通过 .thenfs.asyncFunction 调用包装到您的队列中。

另一种解决方案是使用类似于“监视器”的东西 - 我们可以通过包装 fs 从内部确保访问是一致的:

var fs = B.promisifyAll(require("fs")); // bluebird promisified fs 
var syncFs = { // sync stands for synchronized, not synchronous
    queue: B.resolve();
    writeAsync = function(){
        var args = arguments
        return (queue = queue.then( // only execute later
            return fs.writeAsync.apply(fs,arguments);
        });
    } // promisify other used functions similarly
};

这将产生同步版本的 fs 操作。也可以使用类似的方法自动执行此操作(尚未测试):

// assumes module is promisified and ignores nested functions
function synchronize(module){
    var ret = {}, queue = B.resolve();
    for(var fn in module){
        ret[fn] = function(){
            var args = arguments;
            queue = queue.then(function(){
                return module[fn].apply(module, args); 
            })
        };
    }
    ret.queue = queue; // expose the queue for handling errors
    return ret;
}

这应该产生一个同步其所有动作的模块版本。请注意,我们获得了额外的好处,即错误不会被抑制并且文件系统不会处于不一致状态,因为在导致操作不执行的错误得到处理之前,操作不会被执行。

那是不是有点类似于队列?

是的!队列通过为操作提供先进先出结构来做非常相似的事情(您可以在另一个答案中看到)。很像以该顺序开始执行的程序代码。在我看来,承诺只是同一枚硬币更强大的一面。

另一个答案也通过队列提供了一个可行的选择。

关于您建议的方法

切换到同步 IO 以将数据写入磁盘(出于服务器响应的原因,不想这样做)。

虽然我同意这是最简单的 - 将您需要在同一队列上同步的所有操作链接起来的“监控”方法非常相似。

在我开始写入数据时设置一个标志,并且在设置该标志时不记录任何新数据(导致我在写入过程中丢失数据记录)。

那个标志实际上是一个互斥体。如果您在有人重试时阻止(或让步并将操作放入队列),那么您就有了一个真正的互斥锁,它拥有“互斥保证”。

使用该标志重试,并保留一个包含该标志的下一步操作列表实际上在信号量的实现中非常常见 - 一个例子是在 linux 内核中。

选项 2 的更复杂版本,我设置了标志,当设置标志时,新数据进入一个单独的临时数据结构,当文件 IO 完成后,它与真实数据合并(可行,但似乎丑陋的)。获取原始数据的快照副本,并花时间将该副本写入磁盘,因为知道没有其他人会修改该副本。我不想这样做,因为数据集比较大,而且我处于内存有限的环境(Raspberry PI)中。

这些方法通常被称为事务性 RCU 更新,在某些情况下它们实际上非常现代且非常快速 - 例如对于“读写器问题”(这与您所拥有的非常相似)。对这些的本地支持最近在 linux 内核中启动。在某些情况下这样做实际上既可行又高效,尽管在你的情况下有点像你建议的那样过于复杂。

所以,总结一下

  • 这不是一个简单的问题,而是一个有趣的问题。
  • 幸运的是,promise 很好地解决了这个问题,它们的构建正是为了通过抽象序列的概念来解决这类问题。

快乐的编码,Pi NodeJS 项目听起来很棒。如果我能进一步澄清这一点,请告诉我。

【讨论】:

  • 感谢您解释如何在这方面使用 Promise,但它们似乎不像简单的队列那样适合,所以我选择了队列。我到处都在使用 Promise 来管理异步 IO(即使使用 Promise,写起来也很痛苦)。至少对我的 IO 使用的承诺使错误处理更易于管理。
  • 您可以随意使用(或不使用)它,但在我看来,如果您已经使用async 的队列另外使用承诺是多余的,因为您已经得到了这个来自承诺的免费功能。
猜你喜欢
  • 2015-05-22
  • 2018-05-12
  • 2019-11-22
  • 2013-09-18
  • 2018-03-17
  • 2019-12-07
  • 2014-05-20
  • 2011-03-12
  • 1970-01-01
相关资源
最近更新 更多