【问题标题】:One Promise triggering zero or more Promises一个 Promise 触发零个或多个 Promise
【发布时间】:2016-09-10 05:53:56
【问题描述】:

简短的问题,相当抽象:

Process A 和 process B 都返回承诺。

进程B 需要调用进程A 零次或多次来解决其承诺。这种事情的正确模式是什么?

更长但更具体的问题:

想象一下,我有一个produceMsg 进程,它创建了一个承诺,当它被解决时,会产生一个 n 字节的缓冲区。 (也许它正在通过网络连接获取字节。也许偶尔会产生错误。)这是一个测试夹具:

// promise to yield a buffer of up to 20 bytes and an occasional error
function produceMsg() {
    return new Promise(function(resolve, reject) {
        var n = Math.floor(Math.random() * 20);
        if (n === 0) {       // generate an error sometimes...
            reject("some error");
        } else {             // create a buffer with n random bytes
            var msg = createRandomMessage(n);
            console.log('generating', msg);
            resolve(msg);
        }
    });
};

// helper method: create a buffer of n random bytes
function createRandomMessage(n) {
    return Buffer(Array(n).fill().map(function(e) {
        return Math.floor(Math.random() * 256); }));
}

现在想象我有一个方法来consume 那些承诺:

function consume() {
    setInterval(function() {
        produceMsg()
            .then(function(b)  { console.log("==> fetched", b); })
            .catch(function(b) { console.log("==> error", b); })
                ;
    }, 200);
}

测试它,它按预期工作:

generating <Buffer c8 71 6a 3f fe 84 05 71>
==> fetched <Buffer c8 71 6a 3f fe 84 05 71>
generating <Buffer 03 66>
==> fetched <Buffer 03 66>
==> error some error
generating <Buffer 49 d2 4f 6f d6 bc 48 cf e7 db f7 f6 f7 e2 e7 5c df>
==> fetched <Buffer 49 d2 4f 6f d6 bc 48 cf e7 db f7 f6 f7 e2 e7 5c df>
generating <Buffer ef 6c 5f 3c 2f c8 b1 ff b5 eb 13 0e 76 d8>
==> fetched <Buffer ef 6c 5f 3c 2f c8 b1 ff b5 eb 13 0e 76 d8>

但是现在我被告知我必须将传入的数据包重新构建为 10 字节的块。 (好吧,这是一个人为的例子,但我确实必须重新构建数据包。)

所以我需要一个中间 reframer 对象,它创建一个承诺,在它解析时返回十字节数据包。如果它没有十个字节,它需要从produceMsg 进程中收集字节,直到它积累足够多。

我修改后的consume 方法可能如下所示:

function consume() {
    var reframer = new Reframer(produceMsg);
    setInterval(function() {
        reframer.read()
            .then(function(b)  { console.log("==> fetched", b); })
            .catch(function(b) { console.log("==> error", b); })
                ;
    }, 200);
}

...并使用与上面相同的数据,我希望输出如下所示:

generating <Buffer c8 71 6a 3f fe 84 05 71>
generating <Buffer 03 66>
=> fetched <Buffer c8 71 6a 3f fe 84 05 71 03 66>
=> error some error
generating <Buffer 49 d2 4f 6f d6 bc 48 cf e7 db f7 f6 f7 e2 e7 5c df>
=> fetched <Buffer 49 d2 4f 6f d6 bc 48 cf e7 db>
generating <Buffer ef 6c 5f 3c 2f c8 b1 ff b5 eb 13 0e 76 d8>
=> fetched <Buffer f7 f6 f7 e2 e7 5c df ef 6c 5f>
=> fetched <Buffer 3c 2f c8 b1 ff b5 eb 13 0e 76>

(注意最后两行reframer 产生了两条消息,但没有调用produceMsg,因为它已经积累了足够的字节。)

问题:reframer.read() 的结构是什么?

我还没有弄清楚如何构建reframer.read() 方法的内部结构。有没有一种很好的模式来做这种事情,其中​​一个承诺有条件地将调用链接到零个或多个承诺?

(注意:我不是询问如何使用 concatslice 缓冲区等等——我已经有代码可以做到这一点。我坚持的是控制Promises的生成和解析流程。)

【问题讨论】:

  • 你似乎在寻找递归。
  • @Bergi 好吧,伪递归...

标签: javascript node.js promise


【解决方案1】:

我真的很喜欢Alnitak's 的答案,并且我有动力将其改写为一般模式。 (其他任何人都应该随时对此进行改进)。

如果您有一个 Promise 生产流程 B 需要调用 Promise 生产流程 A 零次或多次才能解决其承诺,则一般模式是:

function B(A) {
    if (have_necessary_data()) {
        return Promise.resolve(processed_data());
    } else {
        return A().then(function(incoming_data) { 
            do_something_with(incoming_data);
            return B(A);
         });
    }
};

【讨论】:

  • 这很有趣。试图更好地理解这种模式。那么本例中的B函数对应的是什么?是示例中的Reframer 类吗?那么消费者的代码会是什么样子呢? ...
  • 是的 - B()(大致)对应于上面的 Reframer。并且由于B() 返回一个Promise,消费者的代码简单地将返回值视为任何普通的promise。但是请参阅上面我的“停止新闻”评论——A() 必须在每次调用时返回一个 new 承诺,否则这将不起作用。
  • 知道了,很想知道最终的解决方案是什么样的。追求它的荣誉。
  • 简短而晦涩的答案是:当解决方案确实需要执行副作用的异步“推送”函数时,不要尝试创建返回值的同步“拉取”函数。如果你能理解我的意思,加分! :)
  • 哈哈,我想我明白了,但不要试图让我解释:)
【解决方案2】:

我认为不可能进行多重承诺解决,尽管您根本无法解决,但一次返回多个缓冲区的一种解决方案是使用格式化缓冲区数组解决最终承诺,而不是在消费者中数组:

function Reframer (msg) {
    this.msg = msg
    this.stash = []
}

Reframer.prototype.read = function (){
    return this.msg()
        .then((b) => {
            this.stash.push(b)

            // Do check here for stashed Buffers and apply appropriate logic to
            // slice and dice them if there is something to return do this with promise
            // Assuming I have define `chunk` variable which is array of 10 bytes Buffer(s)
            // And if there is anything else left it has to be stashed appropriately for future reuse if any.

            var chunk = [Buffer(10), Buffer(10)]

            return Promise.resolve(chunk)
        })
        .catch((err) => Promise.reject(err))
}

和消费者:

function consume() {
    var reframer = new Reframer(produceMsg);
    setInterval(function() {
        reframer.read()
            .then(function(abs)  { abs.forEach((b) => console.log("==> fetched", b)); })
            .catch(function(err) { console.log("==> error", err); });
    }, 200);
}

'hard' setInterval 也可能会产生某种竞争条件。恕我直言,更好的模式将使用 setTimeout 递归调用函数。

var reframerOne = new Reframer(produceMsg);
function consumeOne() {
    setTimeout(function() {
        reframerOne.read()
            .then(function(abs)  { 
                abs.forEach((b) => console.log("==> fetched", b));
                consumeOne();
            })
            .catch(function(err) { console.log("==> error", err); });
    }, 200);
}

虽然你可以在这里举个例子。所以只是在这种情况下澄清一下。

【讨论】:

  • Dmi3y:我还没有验证过,但我认为您的解决方案存在问题:假设生产者每次生产 40 口。但是reframer一直阻塞,直到producer生成消息,然后只能yield一个10字节的包,所以会越来越落后。我认为 Alnitak 的回应很有希望。
  • 好吧,如果生产者每次产生 40 次咬合,那么重构器的承诺将立即通过 4 个 chanked 缓冲区的数组来解决......并且为下一次调用留空存储空间
  • 啊——我明白了。感谢您的澄清。
【解决方案3】:

我认为你所追求的是这样的:

function Reframer(producer) {
    var buffer;     // for accumulation

    this.read = function() {
        if (buffer.length >= max) {
            frame = first "max" bytes removed from buffer
            return Promise.resolve(frame)
        } else {
            return producer.read().then((read_data) => {
                append read_data to buffer
                return this.read();  // pseudo-recurse
            });
        }
    };
}

即如果累积的缓冲区中已经有足够的字节,则从该缓冲区中删除所需的字节数,并返回一个立即用这些字节解析的承诺。

否则,请生产者发送更多数据,将其添加到缓冲区中,然后递归到上面的步骤。

如果任何.reject 是由生产者生成的,它应该只是传播并导致.read 方法也被拒绝。

[else 分支的替代版本可能会直接使用新数据解析,如果它足以填充缓冲区以避免需要将整个新数据附加到缓冲区只是为了立即将其再次删除下一个递归步骤]

【讨论】:

  • 像冠军一样工作。我没有想到我可以return Promise.resolve()——那是缺失的部分。 (是的,生产者的.rejects 被正确传播。)
  • 停止印刷机!单元测试发现了一个问题:假设生产者已经解决了一个 11 字节的数据包。 Reframer 每次调用 producer.read() 时,都会收到相同的数据包,导致数据重复。还是我错过了一些基本的东西?
  • @fearless_fool 你是否真的从临时缓冲区中删除了 10 个字节(最终在 frame 中的那些)?
  • 啊,或者您是否只期望producer 返回一个承诺 - 在我上面的代码中,我有一个 read 方法,它产生一个 new 承诺每次调用时
  • 是的,我的一个概念缺陷:显然 read 方法需要在每次调用时返回一个新的承诺(因为承诺的一般合同是评估其解决的价值。)我'将不得不考虑更多。
猜你喜欢
  • 1970-01-01
  • 2018-01-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-07-31
  • 1970-01-01
相关资源
最近更新 更多