【问题标题】:Batch request and process in parallel in Node with async iterators使用异步迭代器在 Node 中并行批处理请求和处理
【发布时间】:2021-11-04 04:24:21
【问题描述】:

我有一个小函数可以将日志发送给它。在函数中,我试图将其发送到 AWS Cloudwatch,但是我遇到了一些问题。一旦发送了一个日志,我就无法发送另一个日志,直到第一个日志完成,因为我需要下一个 sequenceToken,以便下一个日志知道在哪里添加自己。我知道异步迭代器是这里的关键,但不确定如何在我的代码中实现它们。这是我当前的代码,在发送第一个日志后失败:

const build = require('pino-abstract-stream');

const stream = async (options) => {
    // Creates the AWS connection
    const client = await createClient();
 
    // Gets the first token
    let sequenceToken = await getInitSequenceToken(client);

    return build(function (source) {
        source.on("data", async function (obj) {
            // Every time the log is sent to my worker thread it prints out here. Obj contains the info from the log
            const command = new PutLogEventsCommand({
                logGroupName: "api",
                logStreamName: `executive-${Config.env}-${Config.location}`,
                logEvents: [
                    {
                        message: obj.msg,
                        timestamp: obj.time,
                    },
                ],
                sequenceToken,
            });

            // Here I am sending the log to Clouwatch 
            const response = await client.send(command);
        
            // Here I was updating the token but this fails as the next log is already sending 
            sequenceToken = response.nextSequenceToken;
        });
    });
};

【问题讨论】:

  • 什么是source,一个websocket?
  • 使用自定义传输的 pino 日志:github.com/pinojs/pino-abstract-transport
  • 使用该链接中的第一个示例,即带有for await 的示例,即“同步”版本。
  • 我知道我可以使用第一个示例并一次发送一个请求,但肯定会更好地对它们进行批处理,而这正是我不知道该怎么做的地方。
  • 您可以将response 存储在外部变量(数组)中,然后一旦流关闭(source.on('end')),您将循环使用reduce over the client.send

标签: javascript node.js amazon-web-services asynchronous async-await


【解决方案1】:

您可以等待流 (source) 完成,并将所有对象存储到一个数组中,然后通过一个请求发送它们 - 看到 logEvents 接受一个包含多个对象的数组 - 这可能是更好的解决方案因为您在对象上有时间戳,并且您会将它们全部分组在一个 sequenceToken 下:

const build = require('pino-abstract-stream');

const stream = async(options) => {
  // Creates the AWS connection
  const client = await createClient();

  // Gets the first token
  let sequenceToken = await getInitSequenceToken(client);
  let storeStreamObjects = [];
    
  return build(function(source) {
    source.on("data", async function(obj) {
      storeStreamObjects.push({
        message: obj.msg,
        timestamp: obj.time,
      });
    });
    source.on("close", async () => {

      const command = new PutLogEventsCommand({
        logGroupName: "api",
        logStreamName: `executive-${Config.env}-${Config.location}`,
        logEvents: storeStreamObjects, // all objects are here
        sequenceToken,
      });

      return await client.send(command);

    })
  });
};

【讨论】:

  • 不敢相信我没有看到这个解决方案,典型的过度思考问题。谢谢
  • 在我看来,除非服务器关闭,否则流不会结束。在服务器处于活动状态的整个过程中,都会有数据被发送到流中
  • 我已将完成事件从 end 更改为 close,因为他们说这是一个 split2 实例。
【解决方案2】:

如果以后可以将日志发送到云端,您可以执行以下操作。

const build = require("pino-abstract-stream");

const stream = async options => {
  // Creates the AWS connection
  const client = await createClient();

  // Gets the first token
  let sequenceToken = await getInitSequenceToken(client);

  const steamData = [];

  return build(function (source) {
    source.on("data", async function (obj) {
      steamData.push(obj);
    });

    source.on("end", async function () {
      // Every time the log is sent to my worker thread it prints out here. Obj contains the info from the log
      for (const data of steamData) {
        const command = new PutLogEventsCommand({
          logGroupName: "api",
          logStreamName: `executive-${Config.env}-${Config.location}`,
          logEvents: [
            {
              message: data.msg,
              timestamp: data.time,
            },
          ],
          sequenceToken,
        });

        // Here I am sending the log to Clouwatch
        const response = await client.send(command);

        sequenceToken = response.nextSequenceToken;
      }
    });
  });
};

【讨论】:

    猜你喜欢
    • 2017-12-26
    • 1970-01-01
    • 2019-05-13
    • 2021-10-08
    • 2012-05-20
    • 2020-12-20
    • 1970-01-01
    • 2018-11-26
    • 1970-01-01
    相关资源
    最近更新 更多