【问题标题】:Guarantee sequential order of execution of an arbitrary number of callbacks保证任意数量的回调的执行顺序
【发布时间】:2019-03-26 16:43:53
【问题描述】:
我正在订阅一个事件队列,每次我收到一个事件时,我都必须发出一个异步 HTTP 请求并发布响应,按照我将事件接收到另一个队列的相同顺序。所以基本上订阅一个 pub/sub 应用程序,做一些异步计算并发布计算结果而不是另一个 pub/sub 应用程序。由于我没有要执行的回调数量,我不能使用 async.series。
我想做的是创建一个队列,让我插入一条消息和一个 id,并在每次插入的 id 等于发出的最后一个 id + 1 时发出一个事件。然后我会订阅这个每次我从队列中取出一个事件时排队并发布到我的 pub/sub 应用程序,因为这样可以保证顺序。
我需要做的似乎是一项非常常见的任务,但我一直无法找到一个模块。 NPM 上是否有已经做到这一点的东西,或者有更好的方法来完成我所需要的?
【问题讨论】:
标签:
node.js
message-queue
publish-subscribe
【解决方案1】:
我最终创建了自己的模块来做我需要的事情,即能够订阅发布/订阅应用程序并发布到另一个应用程序,在两者之间进行一些异步工作,但仍保持消息的顺序由订阅者接收。
有了这个模块,我可以在我的 OrderedPubSub 模块的on("message") 接收到来自第一个应用程序的事件和otherApplication.publish(message) 时需要执行的异步工作的回调中执行orderedPubSub.publish(id, message)。
我希望有另一种方法可以做到这一点,或者 NPM 中已经有一个模块。
const EventEmitter = require('events');
class OrderedPubSub extends EventEmitter {
constructor(initialId = 0) {
super()
this.lastPublishedId = initialId
this.messages = {}
}
publish(id, message) {
this.messages[id] = message
this.publishAllAvailable()
}
publishAllAvailable() {
let messageId;
while((messageId = this.lastPublishedId + 1) in this.messages) {
const message = this.messages[messageId]
delete this.messages[messageId]
this.lastPublishedId++
this.emit("message", message)
}
}
}
const orderedPubSub = new OrderedPubSub();
orderedPubSub.on('message', message => {
console.log(`Received message: "${message}"`)
});
orderedPubSub.publish(3, "third message")
orderedPubSub.publish(2, "second message")
orderedPubSub.publish(4, "fourth message")
orderedPubSub.publish(10, "tenth message")
orderedPubSub.publish(1, "first message")
//outputs
// Received message: "first message"
// Received message: "second message"
// Received message: "third message"
// Received message: "fourth message"