【问题标题】:Node Streams - Listening for unpipe in a Readable Stream节点流 - 在可读流中监听 unpipe
【发布时间】:2017-08-26 07:44:27
【问题描述】:

所以,我创建了一个读取流,它首先连接到 SFTP 并开始从文件中读取。在任何时候,我的代码都可以解除该 readstream 的管道并执行其他操作。例如,我可能会使用它来获取 CSV 的前几行并停止读取。

问题是,我不知道如何在我的 readStream 构造函数中监听 unpipe 事件,以便正确关闭 SFTP 连接。我在写流中使用flush 方法,读流有类似的方法吗?

这是我的 readStream 构造函数的简化部分:

const Client = require('ssh2').Client,
      nom = require('noms');

function getStream (get) {
    const self = this;
    const conn = new Client();

    let client,
        fileData,
        buffer,
        totalBytes = 0,
        bytesRead = 0;

    let read = function(size,next) {
        const read = this;
        // Read each chunk of the file
        client.read(fileData, buffer, bytesRead, size, bytesRead,
            function (err, byteCount, buff, pos) {
                bytesRead += byteCount;
                read.push(buff);
                next();
            }
        );
    };

    let before = function(start) {
        // setup the connection BEFORE we start _read
        conn.on('ready', function(){
            conn.sftp(function(err,sftp) {
                sftp.open(get, 'r', function(err, fd){
                    sftp.fstat(fd, function(err, stats) {
                        client = sftp;
                        fileData = fd;
                        totalBytes = stats.size;
                        buffer = new Buffer(totalBytes);

                        start();
                    });
                });
            });
        }).connect(credentials);
    };

    return nom(read,before);
}

稍后我可能会拨打myStream.pipe(writeStream),然后拨打myStream.unpipe()。但因为我无法监听那个unpipeevent,读取停止,但 SFTP 连接保持打开状态并最终超时。

有什么想法吗?

【问题讨论】:

    标签: node.js fs node-streams


    【解决方案1】:

    所以,经过更多研究,我了解到当您调用 readStream.unpipe(writeStream) 时,ReadStreams 没有通过 unpipe 事件。该事件仅传递给 writeStream。为了监听 unpipe,您需要在 readStream 上显式发出一个事件,如下所示:

    readStream.emit('unpipe');
    

    您可以在流构造函数内部或外部的任何地方侦听此事件,这非常方便。所以,这将使上面的代码看起来像这样:

    function getStream (get) {
        /**
         * ... stuff
         * ... read()
         * ... before()
         * ... etc
         */
    
        let readStream = nom(read,before);
    
        readStream.on('unpipe', function(){
            console.log('called unpipe on read stream');
        });
    
        return readStream;
    }
    

    故事的寓意是,流已经有了Event Emitter class methods,因此您可以开箱即用地发出和监听自定义事件。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-08-19
      • 2019-05-06
      • 2018-02-09
      • 1970-01-01
      • 2014-12-19
      • 2011-08-20
      • 1970-01-01
      相关资源
      最近更新 更多