【问题标题】:How do I read the contents of a Node.js stream into a string variable?如何将 Node.js 流的内容读入字符串变量?
【发布时间】:2012-05-24 07:49:38
【问题描述】:

我正在破解一个 Node 程序,该程序使用smtp-protocol 来捕获 SMTP 电子邮件并处理邮件数据。该库将邮件数据作为流提供,我不知道如何将其转换为字符串。

我目前正在使用stream.pipe(process.stdout, { end: false }) 将其写入标准输出,但正如我所说,我需要字符串中的流数据,一旦流结束我就可以使用它。

如何将 Node.js 流中的所有数据收集到字符串中?

【问题讨论】:

  • 您应该复制流或使用 (autoClose: false) 标记它。污染内存是不好的做法。

标签: javascript node.js stream


【解决方案1】:

您对此有何看法?

async function streamToString(stream) {
    // lets have a ReadableStream as a stream variable
    const chunks = [];

    for await (const chunk of stream) {
        chunks.push(Buffer.from(chunk));
    }

    return Buffer.concat(chunks).toString("utf-8");
}

【讨论】:

  • 必须使用chunks.push(Buffer.from(chunk)); 才能使用字符串块。
  • 哇,这看起来很整洁!这有什么问题吗(除了上面评论中提到的问题)?它可以处理错误吗?
  • 这是现代等价于最佳答案。 Whew Node.js/JS 变化很快。我建议使用这个,而不是评价最高的,因为它更干净,并且不会让用户不得不触摸事件。
  • @DirkSchumacher 您的 IDE 要么使用过时的脚本解释器 (for await is a valid ECMAScript syntax),要么在尝试(未成功)执行某些包含 for await 的代码时本身已过时。它是哪个IDE?无论如何,IDE 并非旨在实际运行“在生产中”的程序,它们会在开发过程中对它们进行 lint 并帮助进行分析。
  • @DirkSchumacher 不打扰。看看您能否确切地找出您的 IDE 的哪个组件——我假设它是一个程序——加载并执行包含for await 的脚本。查询程序的版本,看看版本是否真的支持语法。然后找出您的 IDE 使用特定“过时”版本程序的原因,并找到更新两者的方法。
【解决方案2】:

另一种方法是将流转换为承诺(请参阅下面的示例)并使用then(或await)将解析后的值分配给变量。

function streamToString (stream) {
  const chunks = [];
  return new Promise((resolve, reject) => {
    stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)));
    stream.on('error', (err) => reject(err));
    stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
  })
}

const result = await streamToString(stream)

【讨论】:

  • 您必须在异步函数中调用 streamtostring 函数。为避免这种情况,您也可以这样做streamToString(stream).then(function(response){//Do whatever you want with response});
  • 这应该是最佳答案。 恭喜您产生了唯一能正确处理所有问题的解决方案,其中 (1) 将块存储为缓冲区,并且只调用 .toString("utf8")最后,避免在多字节字符中间分割块时解码失败的问题; (2) 实际的错误处理; (3) 将代码放在一个函数中,这样可以重复使用,而不是复制粘贴; (4) 使用 Promises 所以函数可以await-ed on; (5) 与某些 npm 库不同,不会拖入一百万个依赖项的小代码; (6) ES6 语法和现代最佳实践。
  • 为什么不把 chunks 数组移动到 Promise 中呢?
  • 在我使用当前最佳答案作为提示想出了基本相同的代码后,我注意到如果流产生string 块而不是Buffer,上述代码可能会失败并出现Uncaught TypeError [ERR_INVALID_ARG_TYPE]: The "list[0]" argument must be an instance of Buffer or Uint8Array. Received type string .使用 chunks.push(Buffer.from(chunk)) 应该同时适用于 stringBuffer 块。
  • 事实证明,最佳答案迟到了:stackoverflow.com/a/63361543/1677656
【解决方案3】:

(这个答案是几年前的,当时它是最好的答案。现在下面有一个更好的答案。我没有跟上node.js,我无法删除这个答案,因为它被标记为“正确”关于这个问题”。如果你正在考虑向下点击,你想让我做什么?)

关键是使用Readable Streamdataend 事件。收听这些事件:

stream.on('data', (chunk) => { ... });
stream.on('end', () => { ... });

当您收到data 事件时,将新的数据块添加到为收集数据而创建的 Buffer 中。

当您收到end 事件时,如有必要,将完成的 Buffer 转换为字符串。然后用它做你需要做的事情。

【讨论】:

  • 几行代码说明答案比仅仅指向 API 的链接更可取。不要不同意答案,只是不相信它足够完整。
  • 使用较新的 node.js 版本,这更干净:stackoverflow.com/a/35530615/271961
  • 答案应该更新为不推荐使用 Promises 库,而是使用原生 Promises。
  • @DanDascalescu 我同意你的看法。问题是我 7 年前写了这个答案,而我没有跟上 node.js 。如果你是其他人想要更新它,那就太好了。或者我可以简单地删除它,因为似乎已经有了更好的答案。你会推荐什么?
  • @ControlAltDel:感谢您主动删除不再是最好的答案。希望其他人也有类似的discipline
【解决方案4】:

还有一个使用 Promise 的字符串:

function getStream(stream) {
  return new Promise(resolve => {
    const chunks = [];

    # Buffer.from is required if chunk is a String, see comments
    stream.on("data", chunk => chunks.push(Buffer.from(chunk)));
    stream.on("end", () => resolve(Buffer.concat(chunks).toString()));
  });
}

用法:

const stream = fs.createReadStream(__filename);
getStream(stream).then(r=>console.log(r));

如果需要,请删除 .toString() 以与二进制数据一起使用。

更新:@AndreiLED 正确指出字符串存在问题。我无法使用我拥有的节点版本获得返回字符串的流,但api 指出这是可能的。

【讨论】:

  • 我注意到如果流生成string 块而不是Buffer,上述代码可能会因Uncaught TypeError [ERR_INVALID_ARG_TYPE]: The "list[0]" argument must be an instance of Buffer or Uint8Array. Received type string 而失败。使用 chunks.push(Buffer.from(chunk)) 应该同时适用于 stringBuffer 块。
【解决方案5】:

列出的所有答案似乎都以流动模式打开可读流,这不是 NodeJS 中的默认设置,并且可能存在限制,因为它缺乏 NodeJS 在暂停可读流模式下提供的背压支持。 这是一个使用 Just Buffers、Native Stream 和 Native Stream Transforms 并支持对象模式的实现

import {Transform} from 'stream';

let buffer =null;    

function objectifyStream() {
    return new Transform({
        objectMode: true,
        transform: function(chunk, encoding, next) {

            if (!buffer) {
                buffer = Buffer.from([...chunk]);
            } else {
                buffer = Buffer.from([...buffer, ...chunk]);
            }
            next(null, buffer);
        }
    });
}

process.stdin.pipe(objectifyStream()).process.stdout

【讨论】:

    【解决方案6】:

    在我的例子中,内容类型响应标头是 Content-Type: text/plain。所以,我已经从 Buffer 中读取了数据,例如:

    let data = [];
    stream.on('data', (chunk) => {
     console.log(Buffer.from(chunk).toString())
     data.push(Buffer.from(chunk).toString())
    });
    

    【讨论】:

      【解决方案7】:

      希望这比上面的答案更有用:

      var string = '';
      stream.on('data',function(data){
        string += data.toString();
        console.log('stream data ' + part);
      });
      
      stream.on('end',function(){
        console.log('final output ' + string);
      });
      

      请注意,字符串连接并不是收集字符串部分最有效的方法,但它是为了简单起见(也许您的代码并不关心效率)。

      此外,此代码可能会为非 ASCII 文本产生不可预知的故障(它假定每个字符都适合一个字节),但也许您也不关心这一点。

      【讨论】:

      • 收集字符串部分的更有效方法是什么? TY
      • 你可以使用缓冲区docs.nodejitsu.com/articles/advanced/buffers/how-to-use-buffers 但这真的取决于你的使用。
      • 使用一个字符串数组,将每个新块附加到数组中,并在数组末尾调用join("")
      • 这是不对的。如果缓冲区在一个多字节代码点的中间,那么 toString() 将收到格式错误的 utf-8,你最终会在你的字符串中得到一堆�。
      • @alextgordon 是对的。在一些非常罕见的情况下,当我有很多块时,我会在块的开头和结尾得到这些。特别是当边缘上有俄罗斯符号时。因此,连接块并最终转换它们而不是转换块并连接它们是正确的。在我的情况下,请求是使用默认编码的 request.js 从一个服务向另一个服务发出的
      【解决方案8】:

      setEncoding('utf8');

      上面的塞巴斯蒂安 J 做得好。

      我有几行测试代码的“缓冲区问题”,并添加了编码信息并解决了它,见下文。

      证明问题

      软件

      // process.stdin.setEncoding('utf8');
      process.stdin.on('data', (data) => {
          console.log(typeof(data), data);
      });
      

      输入

      hello world
      

      输出

      object <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64 0d 0a>
      

      演示解决方案

      软件

      process.stdin.setEncoding('utf8'); // <- Activate!
      process.stdin.on('data', (data) => {
          console.log(typeof(data), data);
      });
      

      输入

      hello world
      

      输出

      string hello world
      

      【讨论】:

        【解决方案9】:

        使用流行(每周超过 500 万次下载)和轻量级 get-stream 库的简单方法:

        https://www.npmjs.com/package/get-stream

        const fs = require('fs');
        const getStream = require('get-stream');
        
        (async () => {
            const stream = fs.createReadStream('unicorn.txt');
            console.log(await getStream(stream)); //output is string
        })();
        

        【讨论】:

          【解决方案10】:

          使用您可能已经在项目依赖项中拥有的quite popular stream-buffers package,这非常简单:

          // imports
          const { WritableStreamBuffer } = require('stream-buffers');
          const { promisify } = require('util');
          const { createReadStream } = require('fs');
          const pipeline = promisify(require('stream').pipeline);
          
          // sample stream
          let stream = createReadStream('/etc/hosts');
          
          // pipeline the stream into a buffer, and print the contents when done
          let buf = new WritableStreamBuffer();
          pipeline(stream, buf).then(() => console.log(buf.getContents().toString()));
          

          【讨论】:

            【解决方案11】:

            我有更多的运气使用这样的:

            let string = '';
            readstream
                .on('data', (buf) => string += buf.toString())
                .on('end', () => console.log(string));
            

            我使用节点v9.11.1readstream 是来自http.get 回调的响应。

            【讨论】:

              【解决方案12】:

              最干净的解决方案可能是使用“string-stream”包,它将流转换为带有承诺的字符串。

              const streamString = require('stream-string')
              
              streamString(myStream).then(string_variable => {
                  // myStream was converted to a string, and that string is stored in string_variable
                  console.log(string_variable)
              
              }).catch(err => {
                   // myStream emitted an error event (err), so the promise from stream-string was rejected
                  throw err
              })
              

              【讨论】:

                【解决方案13】:

                流缩减器之类的东西怎么样?

                这是一个使用 ES6 类的示例,如何使用一个。

                var stream = require('stream')
                
                class StreamReducer extends stream.Writable {
                  constructor(chunkReducer, initialvalue, cb) {
                    super();
                    this.reducer = chunkReducer;
                    this.accumulator = initialvalue;
                    this.cb = cb;
                  }
                  _write(chunk, enc, next) {
                    this.accumulator = this.reducer(this.accumulator, chunk);
                    next();
                  }
                  end() {
                    this.cb(null, this.accumulator)
                  }
                }
                
                // just a test stream
                class EmitterStream extends stream.Readable {
                  constructor(chunks) {
                    super();
                    this.chunks = chunks;
                  }
                  _read() {
                    this.chunks.forEach(function (chunk) { 
                        this.push(chunk);
                    }.bind(this));
                    this.push(null);
                  }
                }
                
                // just transform the strings into buffer as we would get from fs stream or http request stream
                (new EmitterStream(
                  ["hello ", "world !"]
                  .map(function(str) {
                     return Buffer.from(str, 'utf8');
                  })
                )).pipe(new StreamReducer(
                  function (acc, v) {
                    acc.push(v);
                    return acc;
                  },
                  [],
                  function(err, chunks) {
                    console.log(Buffer.concat(chunks).toString('utf8'));
                  })
                );
                

                【讨论】:

                  【解决方案14】:

                  这对我有用,基于Node v6.7.0 docs

                  let output = '';
                  stream.on('readable', function() {
                      let read = stream.read();
                      if (read !== null) {
                          // New stream data is available
                          output += read.toString();
                      } else {
                          // Stream is now finished when read is null.
                          // You can callback here e.g.:
                          callback(null, output);
                      }
                  });
                  
                  stream.on('error', function(err) {
                    callback(err, null);
                  })
                  

                  【讨论】:

                    【解决方案15】:

                    我通常使用这个简单的函数将流转换为字符串:

                    function streamToString(stream, cb) {
                      const chunks = [];
                      stream.on('data', (chunk) => {
                        chunks.push(chunk.toString());
                      });
                      stream.on('end', () => {
                        cb(chunks.join(''));
                      });
                    }
                    

                    使用示例:

                    let stream = fs.createReadStream('./myFile.foo');
                    streamToString(stream, (data) => {
                      console.log(data);  // data is now my string variable
                    });
                    

                    【讨论】:

                    • 有用的答案,但看起来每个块在被推入数组之前必须转换为字符串:chunks.push(chunk.toString());
                    • 这是唯一对我有用的!非常感谢
                    • 这是一个很好的答案!
                    【解决方案16】:

                    Streams 没有简单的 .toString() 函数(我理解),也没有类似 .toStringAsync(cb) 的函数(我不理解)。

                    所以我创建了自己的辅助函数:

                    var streamToString = function(stream, callback) {
                      var str = '';
                      stream.on('data', function(chunk) {
                        str += chunk;
                      });
                      stream.on('end', function() {
                        callback(str);
                      });
                    }
                    
                    // how to use:
                    streamToString(myStream, function(myStr) {
                      console.log(myStr);
                    });
                    

                    【讨论】:

                      【解决方案17】:

                      以上都不适合我。我需要使用 Buffer 对象:

                        const chunks = [];
                      
                        readStream.on("data", function (chunk) {
                          chunks.push(chunk);
                        });
                      
                        // Send the buffer or you can put it into a var
                        readStream.on("end", function () {
                          res.send(Buffer.concat(chunks));
                        });
                      

                      【讨论】:

                      • 这实际上是最干净的方法;)
                      • 效果很好。请注意:如果您想要正确的字符串类型,则需要在 concat() 调用的结果 Buffer 对象上调用 .toString()
                      • 事实证明,最佳答案迟到了:stackoverflow.com/a/63361543/1677656
                      • 这是正确的唯一方法
                      【解决方案18】:

                      从 nodejs documentation 你应该这样做 - 永远记住一个字符串而不知道编码只是一堆字节:

                      var readable = getReadableStreamSomehow();
                      readable.setEncoding('utf8');
                      readable.on('data', function(chunk) {
                        assert.equal(typeof chunk, 'string');
                        console.log('got %d characters of string data', chunk.length);
                      })
                      

                      【讨论】:

                        猜你喜欢
                        • 1970-01-01
                        • 2021-06-27
                        • 2011-08-17
                        • 1970-01-01
                        • 1970-01-01
                        • 1970-01-01
                        • 2012-08-24
                        • 2012-02-04
                        相关资源
                        最近更新 更多