【问题标题】:Parsing large CSV and streaming rows of promises解析大型 CSV 并流式传输承诺行
【发布时间】:2021-06-05 21:06:09
【问题描述】:

在尝试流式传输 csv、为每一行发出 http 请求以及让所有内容以“正确”顺序执行并记录到控制台时有点混乱。最终,我认为我没有正确地包装我的承诺,或者......?

const getUserByEmail = async (email) => {
  const encodedEmail = encodeURIComponent(email);

  try {
    const response = await http.get(`users?email=${encodedEmail}`);
    const userId = response.data.data[0] && response.data.data[0].id;

    return (userId ? userId : `${email} not found`);
  } catch (error) {
    console.error('get user error: ', error);
  }
};

const run = async () => {
  console.log('==> Reading csv ...');

  const promises = [];
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
    .on('error', (error) => console.error('stream error: ', error))
    .on('data', (row) => {
      promises.push(getUserByEmail(row.email));
    })
    .on('end', rowCount => {
      console.log(`==> Parsed ${rowCount} rows from csv ...`);
    })

  await Promise.all(promises)
    .then(values => console.log(values))

  console.log('==> End of script')
};

run();

我正在尝试/期望上面的代码获取 csv 的每一行,将每个 http 调用(一个 Promise)推送到一组 Promise 中,并让所有内容按照我期望的顺序执行/记录到控制台。

这是我的实际输出:

==> Reading csv...
[]
==> End of script
==> Parsed 10 rows from csv ...

这就是我所期待的:

==> Reading csv...
==> Parsed 10 rows from csv ...
[
  QyDPkn3WZp,
  e75KzrqYxK,
  iqDXoEFMZy,
  PstouMRz3y,
  w188hLyeT6,
  g18oxMOy6l,
  8wjVJutFnh,
  fakeEmail@fakeDomain.com not found,
  QEHaG3cp7d,
  y8I4oX6aCe
]
==> End of script

对我来说最大的问题是在“==> 脚本结束”之后记录了任何内容,这向我表明,我无法很好地掌握所有先前事件何时/为什么按它们的顺序记录是。

最终——我还没有做到——我还想将这些请求缓冲/计时到每分钟 100 个,否则我将受到这个特定 API 的速率限制。

谢谢!

【问题讨论】:

  • “正确”顺序是指您希望每个 http 请求按照它们在表中出现的顺序进行吗?如果是这样,Promise.all( 会同时执行所有的 Promise,所以它们完成的顺序基本上是随机的。您可以遍历每一行并await 以完成请求。
  • 感谢您查看@QuinnFreedman — 非常感谢。我本来可以在这方面更清楚。表中的顺序无关紧要——通过“正确”顺序,我主要关心的是在我的“脚本结束”控制台日志之后,流仍在记录。

标签: javascript csv async-await promise fs


【解决方案1】:

一直到await Promise.all(promises) 的孔 readStream 是同步的 - data 事件是异步的,并在另一个事件循环中填充承诺 因此,当您调用 Promise.all 时,promises 是一个空数组 - 您无需等待流结束。您可能希望将您的逻辑放在结束事件中,而不是像这样

const run = async () => {
  console.log('==> Reading csv ...');

  const promises = [];
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
    .on('error', (error) => console.error('stream error: ', error))
    .on('data', (row) => {
      promises.push(getUserByEmail(row.email));
    })
    .on('end', async rowCount => {
      await Promise.all(promises)
        .then(values => console.log(values))

      console.log('==> End of script')
    })
}

另一种更简单的方法是使用异步迭代器 readStream 有一个可以使用的symbol.asyncIterator

const run = async () => {
  console.log('==> Reading csv ...');

  let rowCount = 0
  const promises = []
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
  
  for await (let row of readStream) {
    rowCount++
    promises.push(getUserByEmail(row.email));
  }
    
  console.log(`==> Parsed ${rowCount} rows from csv ...`)

  await Promise.all(promises).then(console.log)

  console.log('==> End of script')
}

我会进一步限制并发性并这样做:

const run = async () => {
  console.log('==> Reading csv ...');

  const result = []
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
  
  for await (let row of readStream) {
    result.push(await getUserByEmail(row.email))
  }

  console.log(result)
  console.log('==> End of script')
}

如果您想提高异步迭代器的并发性,请查看this post,但要小心。使用此方法时结果可能会出现问题

【讨论】:

  • 感谢无尽的解释和所有三个例子!完美运行!
猜你喜欢
  • 2016-03-12
  • 2021-11-08
  • 2017-09-23
  • 2020-02-17
  • 2017-05-18
  • 1970-01-01
  • 2017-11-24
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多