【问题标题】:Attempt at developing a "pull mode" for a RxJS-like structure gets RangeError尝试为类似 RxJS 的结构开发“拉模式”得到 RangeError
【发布时间】:2019-12-03 10:07:04
【问题描述】:

我正在尝试开发一个类似 RxJS 的方案:一个基于事件的“可观察”对象。到目前为止,一切都很好 !

除了默认的类似 RxJS 的推送模式之外,当我尝试在方案中添加“拉模式”时,问题就出现了。

我试图了解代码的哪一部分似乎用完了堆栈。

这是一个运行良好的 sn-p:

const { Channel } = require('./channel');

var c = new Channel();

c.onClosed( (e) => {
    console.log(e);
    console.log("Done");
});

c.listen( (e) => {
    console.log(e);
});

let i = 0;
for (i=0; i<1000000; i++) {
    c.send(++i);
}
c.close(i);

这里是触发异常的sn-p:

const { Channel } = require('./channel');

var sc = new Channel();

let i = 0;

sc.onClosed( (e) => {
    console.log(e);
    console.log("Done");
});

sc.sendAll( () => {
    if (i<100000) {
        sc.send(++i);
        console.log(i); 
    }
    sc.close(i);
});

sc.listen( (e) => {
    console.log(e);
    sc.next();
});

sc.send(1);

这是lib的相关部分:

const EventEmitter = require('events');


class Channel {

    id: string;
    emitter: any;
    isClosed: boolean;

    constructor() {
        this.emitter = new EventEmitter();
        this.isClosed = false;
    }

    listen = (callback) => {
        this.emitter.on("data", callback);
    }

    send = (value) => {
        this.emitter.emit("data", value);
    }

    next = () => {
        this.emitter.emit("received");
    }

    sendAll = (callback) => {
        this.emitter.on("received", callback);
    }

    close = (value) => {
        this.isClosed = true;
        this.emitter.emit("closed", value);
    }

    onClosed = (callback) => {
        this.isClosed = true;
        this.emitter.on("closed", callback);
    }

}


module.exports = {
    Channel: Channel
}

如果迭代次数(“channel”上的“send()”调用次数超过 10.000 次),第二个 sn-p 会触发 RangeError 异常

【问题讨论】:

    标签: javascript node.js data-structures reactive-programming


    【解决方案1】:

    简短回答:sc.sendAll() 和 sc.listen() 之间的来回调用创建了一个两阶段递归函数调用:

    function f() {
        g();
    }
    
    function g() {
        f();
    }
    
    f();
    
    

    因为在这个方案中,每个新的事件侦听器都会递归地产生一个新的回调(在回调内等...)。

    更准确地说:事件适用于实现“推送”(“发送”)方案(参见 RxJS 或 Kefir),但它们不能用于“拉取”(“发送并等待”)确认”)方案(至少不是最初问题中的意图)。

    从这个意义上说,RxJS 的 observables 或 Node 事件(更普遍的 JavaScript 响应式数据结构)与迭代器不同:observable 必须“按原样”(而不是“按需”)处理传入的数据流。从这个意义上说,最初帖子的建议“频道”结构也不同于 Golang 频道,因为它没有“阻塞”模式。

    结论:类似 golang 的“通道”对象可以使用 Node 事件在 JavaScript 中实现,但不能使用相同的方案使其“阻塞”。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-05-22
      • 1970-01-01
      • 2019-06-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多