【发布时间】:2018-12-02 17:55:33
【问题描述】:
我每秒都从 Kafka 接收数据行。对于每批数据,我都插入到我的数据库中。
我的应用一直在读取每批的最后一个 message 和 id。这里的问题是,promise 不是串联运行,而是在一批完成后并发运行,并且它们一直读取相同的 message 和 id。我希望每个 promise 都有自己的 message 和 id,由它们从第一个函数中的 for 循环进入的顺序定义。
我认为我需要使用闭包,但我不确定如何在此处应用它们。 我不想使用计时器!
谢谢!
// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
for (var i = 0; i < batchOfRows.rows.length; i++) {
validate(batchOfRows.rows[i])
.then(result => console.log(result))
.catch(error => console.log(error));
}
});
// For each row received, give it an ID and then insert into the DB
function validate(data) {
return new Promise((resolve, reject) => {
message = data;
id = message.date + message.location
DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
.then(result => {
// Insert into the table at this ID
insertIntoDB(message, id)
.then(result => resolve(result))
.catch(error => reject(error));
})
.catch(error => {
reject(error);
});
});
}
// Inserting into DB
function insertIntoDB(message, id) {
return new Promise((resolve, reject) => {
query = "insert into table2 where id = ? and messageBody = ?";
DB.execute(query, [id, JSON.Stringify(message)])
.then(result => resolve("Successfully inserted message ID " + id))
.catch(error => reject("Error inserting!"));
});
}
编辑(danh 的解决方案):
var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
client, [{
topic: 'my_topic',
partition: 0,
offset: 0
}], {
fromOffset: false
}
);
let results = [];
let promises = Promise.resolve();
function processQueue() {
queue.forEach(element => {
promises = promises.then(element.map(processElement)).then(elementResult => {
// results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
results = [];
queue.shift();
});
});
}
batchOfRows.on('message', function (data) {
console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
queue.push(batchOfRows.rows);
processQueue();
});
function processElement(data) {
const id = data.date + data.location
return DB.execute('select * from table1 where id = ?', id)
.then(result => insertIntoDB(data, id).then(() => result));
}
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)])
.then(result => {
// Pushing the result here
results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
console.log("Test") // On the first batch prints "Test" 72 times right away
});
}
编辑 我通过添加 element.map(processUpdate) 稍微修改了 processQueue 函数,因为从 batchOfRows 接收的批次实际上是数组,我需要对该数组中的每个项目执行数据库查询。
我还删除了 results.push(elementResult) 因为 elementResult 由于某种原因实际上是未定义的。我已将 results.push(elementResult) 移至 insertIntoDB 并将其命名为 results.push(result)。这可能是错误的来源(我不知道如何将insertIntoDB的结果返回给调用promise函数processQueue)。
如果你看一下 insertIntoDB,如果我 console.log("test") 它将打印 test 与 batchOfRows 数组中的元素相同的次数,表示它已经解决了该批次中的所有承诺。因此,在第一批/消息中,如果有 72 行,它将打印“测试”72 次。但是,如果我将console.log(“Test”)更改为简单的results.push(result),甚至results.push(“test”),然后打印results.length它仍然会给我0,直到第二批完成即使我预计长度为 72。
【问题讨论】:
-
当我们讨论这篇文章时,我有一个担心:同时访问一个队列。请参阅此帖子(由 SO 上最杰出的承诺标签人员参加,但未得到答复)。 stackoverflow.com/questions/26756463/… 我建议将所有数据写入数据库,然后在单独的进程上处理数据库持久队列的一个原因是让数据库强制执行这些操作的原子性。我认为这将是一个比我提出的更专业的解决方案
-
更直接地说,我建议的解决方案能否在队列上运行
push的同时另一个承诺正在运行shift?
标签: javascript node.js cassandra promise