【发布时间】:2020-09-21 16:42:46
【问题描述】:
据此answer,
消息总线是一种消息传递基础架构,允许不同系统通过一组共享的接口(消息总线)进行通信。
以下是main() 发起的createHub() 函数和Run() 方法,用于创建消息中心以将发布者与多个订阅者进行通信:
type PubHub struct {
subscribers map[*subscriptionmediator.HandlerSubscription]struct{}
Register chan *subscriptionmediator.HandlerSubscription
Unregister chan *subscriptionmediator.HandlerSubscription
Broadcast chan *events.Env
}
func createHub() *PubHub {
return &PubHub{
subscribers: map[*subscriptionmediator.HandlerSubscription]struct{}{},
Register: make(chan *subscriptionmediator.HandlerSubscription),
Unregister: make(chan *subscriptionmediator.HandlerSubscription),
Broadcast: make(chan *events.Envelope),
}
}
func (h *PubHub) Run() {
for {
select {
case subscriber := <-h.Register:
h.subscribers[subscriber] = struct{}{}
case subscriber := <-h.Unregister:
if _, ok := h.subscribers[subscriber]; ok {
delete(h.subscribers, subscriber)
}
case message := <-h.Broadcast:
for subscriber := range h.subscribers {
subscriber.DataChannel <- message
}
}
}
}
每个订阅者注册的地方,如下图:
subscription := &subscriptionmediator.HandlerSubscription{
conn,
make(chan *events.Envelope),
}
hub.Register <- subscription
DataChannel 用于发布者和多个订阅者之间的通信
type HandlerSubscription struct {
ConnInstance *websocket.Conn
DataChannel chan *events.Envelope
}
1) 上面的代码可以被认为是遵循基于消息总线的发布-订阅模式吗?
2) 如何避免一个订阅者阻止所有订阅者在频道上发信号? subscriber.DataChannel <- message
【问题讨论】:
-
不管怎么称呼它:
subscriber.DataChannel <- message这是阻塞,这意味着一个缓慢或(更糟)卡住的订阅者会阻塞整个世界。 -
@CeriseLimón 那么,
select语法是否通过发送消息 (client.send <- message) 来验证客户端是否被阻止?如果不成功,则运行default案例并关闭频道... -
@CeriseLimón 可以将
PubHub视为here 提到的消息中心的实现吗? -
@CeriseLimón 相关问题:stackoverflow.com/questions/69641809/…
标签: go design-patterns publish-subscribe