【问题标题】:Implementing Observables into a persistent queue library将 Observable 实现到持久队列库中
【发布时间】:2016-12-16 05:15:13
【问题描述】:

目前正在编写一个小的持久队列库,它将读取/写入文本文件中的行。这里是add方法,例如:

Queue.prototype.add = function(line, cb){

    getLock(this, err => {
        if(err){
            this.emit('error', err);
            releaseLock(err, cb);
        }
        else{
            fs.appendFile(this.filepath, line, err => {
               err && this.emit('error', err);
               releaseLock(err, cb);
            });
        }
    });
};

我觉得很尴尬的是支持事件发射器和回调(或事件发射器和承诺)。

换句话说,对于队列中的每个方法(添加、查看、删除),我需要返回/回调一个特定于每个调用的结果。使用事件发射器仅意味着调用者可能会针对并非特定于他们刚刚进行的调用的结果采取行动。所以这里的回调或承诺似乎势在必行——你不能只使用事件发射器。

我想知道的是 - observables 能否以某种方式解决必须将回调与事件发射器或承诺与事件发射器配对的问题?

我正在寻找一种方法来实现这个事件/异步队列,只有一种类型的异步回调机制。也许 observables 不是这里的答案,但我仍然在寻找一个好的设计模式。

【问题讨论】:

    标签: javascript node.js rxjs observable


    【解决方案1】:

    我不太清楚你为什么在这里需要事件发射器....如果你使用 observables,每个订阅者都会从他们自己的调用中得到结果/错误。

    我会这样重写你的方法:

    function appendFileObs(filePath, line){
        return Rx.Observable.create((obs) => {
            fs.appendFile(filePath, line, (err, result) => {
                if(err) obs.onError(err);
                else {
                    obs.onNext(result);
                    obs.onCompleted();
                }
            });
        });
    });
    // Similar for getLock and releaseLock
    
    
    Queue.prototype.add = function(line){
        return getLockObs(this)
            .flatMap(() => appendFileObs(this.filePath, line))
            .flatMap(result => releaseLockObs(undefined).map(() => result))
            .catch((err) => {
                return releaseLockObs(err);
            });
    };
    

    在这个解决方案中,我对流内部有副作用并不感到自豪,它可能是可以改进的,但你明白了。

    这样,当有人调用 .add(line).subscribe() 时,它会得到结果和他调用时发生的错误。

    如果您需要广播发生的错误,您可以使用 BehaviourSubject,它同时是一个观察者和可观察者(有用的东西!)

    【讨论】:

    • 谢谢,这将帮助我 - 监听器是否还有一种方法可以监听队列发出的所有事件(使用 observables)?至少这对调试很有用。
    • 哦,我想你已经通过暗示“BehaviorSubject”回答了我的问题
    • 我猜想用 observables 不可能链接方法,例如:queue.add(line1).add(line2).add(line3).subscribe() 还是这样?
    • 我需要一个事件发射器,因为队列中可能有一个订阅者正在侦听添加到队列中的新项目。但是同一个订阅者需要监听错误,所以如果有错误我们必须发出它。
    • 第二点,我认为您仍然可以使用 Observables,但有一个限制:一旦您在一个流上调用 onError,该流将关闭、最终确定,因此您无法再发送事件。人们用来解决此问题的一种方法是在onNext(类似{isSucessful:boolean, result:object} 的对象)上发送错误,并仅在不可恢复的错误(例如崩溃)上使用onError。如果您觉得这更适合您的情况,您可以使用事件发射器。
    猜你喜欢
    • 2011-02-21
    • 2017-07-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-21
    • 2023-04-05
    • 2018-01-09
    相关资源
    最近更新 更多