【问题标题】:Trouble extracting data from a NodeJS stream in a Lambda从 Lambda 中的 NodeJS 流中提取数据时遇到问题
【发布时间】:2020-03-07 01:48:59
【问题描述】:

我是第一次使用流,我在从可读流中提取数据时遇到了一些问题。

我正在使用 pgpg-copy-streams 从 PSQL DB 中提取大量数据作为流,目的是使用数据库中的数据创建一个 CSV 文件。

这是我的代码:

const aws = require('aws-sdk');
const {Client} = require('pg'); //  Needs the nodePostgres Lambda Layer
const copyTo = require('pg-copy-streams').to;

exports.handler = async (event) => {
let response = {};

console.log(JSON.stringify(event));

const client = new Client();

const deviceId = event.deviceId;
const fromDate = event.fromDate;
const toDate = event.toDate;

if (!deviceId) { // if we do not have a device id, just bail.
    return response = {
        statusCode: 400,
        body: "No device Id",
    };
}

const tempTableQuery = getQuery(deviceId, fromDate, toDate);
console.log("Search query: " + tempTableQuery);
try {

    await client.connect();

    await client.query(tempTableQuery);

    const q = `COPY temp_csv_table to STDOUT with csv DELIMITER ';'`;
    const dataStream = client.query(copyTo(q));

    // dataStream.pipe(console.log(process.stdout));
    dataStream.on('readable', function() {
        // There is some data to read now.
        let data;

        while (data = this.read()) {
            console.log(data); //<- this dosent print anything :(
        }
    });

    dataStream.on('error', async function (err) {
        // Here we can control stream errors
        await client.end();
    });
    dataStream.on('end', async function () {
        await client.end();
    });


} catch (e) {
    response = {
        statusCode: 500,
        result: "Error: " + e
    };
} finally {
    client.end();
}
};

function getQuery(deviceId, fromDate, toDate) {
return `CREATE TEMPORARY TABLE temp_csv_table AS
            SELECT * 
            FROM sensor_data_v2 
            WHERE device_id = '${deviceId}' and 
                  time_stamp between '${fromDate}' and '${toDate}' 
            LIMIT 10;`;
}

问题:

  1. 如何从数据流中提取行?
  2. 有更好的方法吗?

注意事项:

  1. 在 AWS Lambda NodeJS 10.x 运行时上运行。
  2. 我知道表格中有我指定的过滤器的数据。
  3. 我已经为此测试设置了 LIMIT 10,这些条件将返回 2600 行数据。
  4. 我将使用csv-write-stream 包来制作一个包含来自数据库的数据的CSV 文件。没有真正附加到这个包,如果更容易使用,很高兴使用另一个 CSV 编写器。

【问题讨论】:

  • 你的代码在运行吗?您在不允许的异步函数之外使用 await client.connect()。我很困惑,也许我错过了什么。
  • 这里我们应该在函数中传递“数据”。 dataStream.on('可读', function(data){ console.log(data) })
  • @khan 是的,运行得很好。第 4 行“exports.handler = async (event)”
  • @AlexanderG.M.我用你的 sn-p 试过了,控制台上还是什么都没有。

标签: node.js amazon-web-services aws-lambda node-streams


【解决方案1】:

我试图复制这个,我认为你的问题实际上是在 try-catch-finally 块上。

因此,当您的代码到达const dataStream = client.query(copyTo(q)) 部分时,它会启动与promise 无关的流式处理,因此执行直接转到finally 块。但是,在此 finally 块中,您会终止客户端,因此数据流(仍在运行)将收到错误 Error: Connection terminated,因为流需要一些时间。

要解决这个问题,您可以简单地从 finally 块中删除 client.end() 并将该客户端终止放入 catch 块和流 end 事件处理程序中,然后它应该可以工作。

您也可以在readable 事件期间执行console.log(data.toString()),否则打印出来的数据将是二进制的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-10-31
    • 2021-10-12
    • 1970-01-01
    • 2021-08-08
    • 2015-01-21
    • 1970-01-01
    • 2016-10-20
    • 1970-01-01
    相关资源
    最近更新 更多