【问题标题】:Attach two listeners to single axios stream将两个侦听器附加到单个 axios 流
【发布时间】:2019-12-04 04:30:39
【问题描述】:

我正在尝试从 axios 获取 pdf url 作为流。我需要进一步将该文件上传到另一个位置并返回上传文件的哈希值。我有接受流的第三方功能,并将文件上传到目标位置。如何使用相同的流来获取文件的哈希? 我正在尝试运行以下代码:

const getFileStream = await axios.get<ReadStream>(externalUrl, {
    responseType: "stream"
});
const hashStream = crypto.createHash("md5");
hashStream.setEncoding("hex");
const pHash = new Promise<string>(resolve => {
    getFileStream.data.on("finish", () => {
      resolve(hashStream.read());
 });
});

const pUploadedFile = externalUploader({
  stream: () => getFileStream.data
});

getFileStream.data.pipe(hashStream);

const [hash, uploadedFile] = await Promise.all([pHash, pUploadedFile]);

return { hash, id: uploadedFile.id };

运行此代码后,当我下载相同的 pdf 文件时,文件已损坏

【问题讨论】:

    标签: node.js node-streams


    【解决方案1】:

    您可以重复使用相同的 axios getFileStream.data 来管道到多个接收器,只要它们同时使用即可。

    以下是使用 axios 流下载文件并在将文件上传到远程服务器时“并发”计算文件的 MD5 校验和的示例。

    示例将输出标准输出:

    Incoming file checksum: 82c12f208ea18bbeed2d25170f3669a5
    File uploaded. Awaiting server response...
    File uploaded. Done.
    

    工作示例:

    const { Writable, Readable, Transform, pipeline } = require('stream');
    const crypto = require('crypto');
    const https = require('https');
    const axios = require('axios');
    
    (async ()=>{
      // Create an axios stream to fetch the file
      const axiosStream = await axios.get('https://upload.wikimedia.org/wikipedia/commons/thumb/8/86/Map_icon.svg/128px-Map_icon.svg.png', {
          responseType: "stream"
      }); 
    
      // To re-upload the file to a remote server, we can use multipart/form-data which will require a boundary key
      const key = crypto.randomBytes(16).toString('hex');
    
      // Create a request to stream the file as multipart/form-data to another server
      const req = https.request({
        hostname: 'postman-echo.com',
        path: '/post',
        method: 'POST',
        headers: {
          'content-type': `multipart/form-data; boundary=--${key}`,
          'transfer-encoding': 'chunked'
        }
      });
    
      // Create a promise that will be resolved/rejected when the remote server has completed the HTTP(S) request
      const uploadRequestPromise = new Promise(resolve => req.once('response', (incomingMessage) => {
        incomingMessage.resume(); // prevent response data from queuing up in memory
        incomingMessage.on('end', () => {
          if(incomingMessage.statusCode === 200){
            resolve();
          }
          else {
            reject(new Error(`Received status code ${incomingMessage.statusCode}`))
          }
        });
      }));
    
      // Construct the multipart/form-data delimiters
      const multipartPrefix = `\r\n----${key}\r\n` +
          'Content-Disposition: form-data; filename="cool-name.png"\r\n' +
          'Content-Type: image/png\r\n' +
          '\r\n';
      const multipartSuffix = `\r\n----${key}--`;
    
      // Write the beginning of a multipart/form-data request before streaming the file content
      req.write(multipartPrefix);
    
      // Create a promise that will be fulfilled when the file has finished uploading
      const uploadStreamFinishedPromise = new Promise(resolve => {
        pipeline(
          // Use the axios request as a stream source
          axiosStream.data,
          // Piggyback a nodejs Transform stream because of the convenient flush() call that can
          // add the multipart/form-data suffix
          new Transform({
            objectMode: false,
            transform( chunk, encoding, next ){
              next( null, chunk );
            },
            flush( next ){
              this.push( multipartSuffix );
              next();
            }
          }),
          // Write the streamed data to a remote server
          req,
          // This callback is executed when all data from the stream pipe has been processed
          (error) => {
            if( error ){
              reject( error );
            }
            else {
              resolve();
            }
          }
        )
      });
    
      // Create a MD5 stream hasher
      const hasher = crypto.createHash("md5");
    
      // Create a promise that will be resolved when the hash function has processed all the stream
      // data
      const hashPromise = new Promise(resolve => pipeline(
          // Use the axios request as a stream source.
          // Note that it's OK to use the same stream to pipe into multiple sinks. In this case, we're
          // using the same axios stream for both calculating the haas, and uploading the file above
          axiosStream.data,
          // The has function will process stream data 
          hasher,
          // This callback is executed when all data from the stream pipe has been processed
          (error) => {
            if( error ){
              reject( error );
            }
            else {
              resolve();
            }
          }
        ));
    
      /**
       * Note that there are no 'awaits' before both stream sinks have been established. That is 
       * important since we want both sinks to process data from the beginning of stream
       */
    
      // We must wait to call the hash function's digest() until all the data has been processed
      await hashPromise;
      const hash = hasher.digest("hex");
      console.log("Incoming file checksum:", hash);
    
      await uploadStreamFinishedPromise;
      console.log("File uploaded. Awaiting server response...");
      await uploadRequestPromise;
      console.log("File uploaded. Done.");
    })()
      .catch( console.error );
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-08-02
      • 1970-01-01
      • 1970-01-01
      • 2012-11-11
      • 2013-01-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多