【问题标题】:Promise closure within loop循环内的承诺关闭
【发布时间】:2018-12-02 17:55:33
【问题描述】:

我每秒都从 Kafka 接收数据行。对于每批数据,我都插入到我的数据库中。

我的应用一直在读取每批的最后一个 messageid。这里的问题是,promise 不是串联运行,而是在一批完成后并发运行,并且它们一直读取相同的 messageid。我希望每个 promise 都有自己的 messageid,由它们从第一个函数中的 for 循环进入的顺序定义。

我认为我需要使用闭包,但我不确定如何在此处应用它们。 我不想使用计时器!

谢谢!

// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
    for (var i = 0; i < batchOfRows.rows.length; i++) {
        validate(batchOfRows.rows[i])
            .then(result => console.log(result))
            .catch(error => console.log(error));
    }
});

// For each row received, give it an ID and then insert into the DB
function validate(data) {
    return new Promise((resolve, reject) => {
        message = data;
        id = message.date + message.location
        DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
            .then(result => {
                // Insert into the table at this ID
                insertIntoDB(message, id)
                    .then(result => resolve(result))
                    .catch(error => reject(error));
            })
            .catch(error => {
                reject(error);
            });
    });
}

// Inserting into DB
function insertIntoDB(message, id) {
    return new Promise((resolve, reject) => {
        query = "insert into table2 where id = ? and messageBody = ?";

        DB.execute(query, [id, JSON.Stringify(message)])
            .then(result => resolve("Successfully inserted message ID " + id))
            .catch(error => reject("Error inserting!"));
    });
}

编辑(danh 的解决方案):

var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
    client, [{
        topic: 'my_topic',
        partition: 0,
        offset: 0
    }], {
        fromOffset: false
    }
);

let results = [];
let promises = Promise.resolve();

function processQueue() {
    queue.forEach(element => {
        promises = promises.then(element.map(processElement)).then(elementResult => {
            // results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
            console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
            results = [];  
            queue.shift();
        });
    });
}

batchOfRows.on('message', function (data) {
    console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
    queue.push(batchOfRows.rows);
    processQueue();
});

function processElement(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
        .then(result => {
            // Pushing the result here
            results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
            console.log("Test") // On the first batch prints "Test" 72 times right away
        });
}

编辑 我通过添加 element.map(processUpdate) 稍微修改了 processQueue 函数,因为从 batchOfRows 接收的批次实际上是数组,我需要对该数组中的每个项目执行数据库查询。

我还删除了 results.push(elementResult) 因为 elementResult 由于某种原因实际上是未定义的。我已将 results.push(elementResult) 移至 insertIntoDB 并将其命名为 results.push(result)。这可能是错误的来源(我不知道如何将insertIntoDB的结果返回给调用promise函数processQueue)。

如果你看一下 insertIntoDB,如果我 console.log("test") 它将打印 test 与 batchOfRows 数组中的元素相同的次数,表示它已经解决了该批次中的所有承诺。因此,在第一批/消息中,如果有 72 行,它将打印“测试”72 次。但是,如果我将console.log(“Test”)更改为简单的results.push(result),甚至results.push(“test”),然后打印results.length它仍然会给我0,直到第二批完成即使我预计长度为 72。

【问题讨论】:

  • 当我们讨论这篇文章时,我有一个担心:同时访问一个队列。请参阅此帖子(由 SO 上最杰出的承诺标签人员参加,但未得到答复)。 stackoverflow.com/questions/26756463/… 我建议将所有数据写入数据库,然后在单独的进程上处理数据库持久队列的一个原因是让数据库强制执行这些操作的原子性。我认为这将是一个比我提出的更专业的解决方案
  • 更直接地说,我建议的解决方案能否在队列上运行push 的同时另一个承诺正在运行shift

标签: javascript node.js cassandra promise


【解决方案1】:

您的代码中有各种反模式。首先,您不需要手动创建承诺,可能您永远不需要调用new Promise。其次,您通过不从 onFulfill 处理程序中返回嵌套的承诺来破坏承诺链。最后你在不声明变量时污染了全局范围id = message.date + message.location

// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
let pending = Promise.resolve([]); // previous batch starting w/ resolved promise
batchOfRows.on('message', function (data) {
    // not sure where was batchRows comming from in your code
    const nextBatch = () => Promise.all(
      data.batchOfRows.rows.map(validate)
    );

    // reassign pending to a new promise
    // whatever happend to previous promise we keep running
    pending = pending
      .then(nextBatch)
      .catch(e => console.error(e))
});

// For each row received, give it an ID and then insert into the DB
function validate(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

// Inserting into DB
function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
}

【讨论】:

  • 非常感谢!你介意用英语向我解释一下 const nextBatch = () => Promise.all(data.batchOfRows.rows.map(validate));作品?匿名函数有什么用?另外,你在返回的 Promise 中多次执行 () => result,这是为了什么,它们最终会在哪里?
  • @stark0323,刚刚阅读了这个帖子,只是想提出一个更明确的数据结构(至少为了清楚起见),但看起来你已经解决了。这个答案声明了一个名为nextBatch 的小辅助函数,它通过在batchOfRows.rows 的每个元素(map)上调用验证来构建一个承诺数组,然后将该承诺数组传递给Promise.all()
  • @danh 我(也许还有很多其他人)肯定会发现您的解决方案很有用(如果不是更有用的话)。为了清楚起见,您介意分享您的代码吗?
  • 我同意这个答案。我在下面添加了一个,它用更多的数据明确表示了目标,但是从这里的代码中借用了,这就是为什么我建议将这个标记为正确的原因。
【解决方案2】:

稍微抽象一下这些想法,并在数据中明确表示它们(而不是在承诺中隐含保留的数据)可能会有所帮助。从队列开始:

let queue = [];

使用queue.push(element) 将内容添加到队列中,并使用element = queue.shift() 按到达顺序获取和删除

我们的目标是按顺序处理队列中的任何内容,按顺序保存结果。处理本身是异步的,我们希望在开始下一个队列项之前完成一个队列项,因此我们需要一系列 Promise(称为 promises)来处理队列:

let results = [];
let promises = Promise.resolve();

function processQueue() {
    queue.forEach(element => {
        promises = promises.then(processElement(element)).then(elementResult => {
            results.push(elementResult);
            queue.shift();
        });
    });
}

我们可以说服自己这是正确的,甚至不用考虑processElement() 做了什么,只要它返回一个承诺。 (在 OP 案例中,该承诺是处理“行”数组的承诺)。 processElement() 会做这件事,结果(在 OP 案例中是一组结果)将被推送到 results

确信操作的顺序是有意义的,当新批次到达时,将其添加到队列中,然后处理队列中的任何内容:

batchOfRows.on('message', function (data) {
    queue.push(batchOfRows.rows);
    processQueue();
});

我们只需要定义processElement()。在这里使用@YuryTarabanko 的有用建议(并将他的答案标记为正确,IMO)

function processElement(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
}

这样做的一个很好的副作用是您可以衡量进度。如果输入到达太快,则表达式:

queue.length - results.length

...会随着时间的推移而增长。

编辑 看着较新的代码,我很困惑为什么要对每一行(batchOfRows.rows 中的每个元素)进行查询。由于该查询的结果被忽略,所以不要这样做......

function processElement(data) {
    const id = data.date + data.location
    // we know everything we need to know to call insert (data and id)
    // just call it and return what it returns :-)
    return insertIntoDB(data, id);
}

我现在明白这将是一项长期运行的任务,它不应该累积结果(即使是线性的)。更干净的解决方法是删除我建议的对 results 数组的所有引用。 insert 的最小版本只是插入并返回插入的结果...

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)]);
}

我认为你添加了一些代码来记录结果(更好的测试是通过一些外部进程检查数据库,但如果你想记录,请记住pass-through记录后的结果值。

anyPromise.then(result => {
    console.log(result);
    return result;  // IMPORTANT
})

【讨论】:

  • 感谢这个伟大的解决方案!我相信很多人会发现这个解决方案和我一样有用。我其实也打算实现这个方法。
  • 一个问题:长时间运行是否可行? (可能永远)?记忆会成为问题吗?
  • 内存增长将是批次到达率与处理所需时间的(非常非线性)函数。如果您可以在下一批到达之前完成一批(平均而言),那么内存根本不会增长。但是,如果新批次在前一批完成之前出现,内存将呈指数增长。
  • 我的猜测也可以。尝试使用日志记录来查看队列是否增长。
  • 我不确定。再次查看代码,我不太了解传递给.on('message 的匿名函数的数据参数,函数(数据)...` 我也没有看到batchOfRows 的定义。也许,为问题添加更多上下文?
猜你喜欢
  • 1970-01-01
  • 2016-03-17
  • 1970-01-01
  • 2017-12-18
  • 2017-10-30
  • 2022-12-08
  • 2017-12-16
  • 2015-09-09
  • 2018-04-28
相关资源
最近更新 更多