【发布时间】:2018-01-14 05:58:39
【问题描述】:
我正在将事件从 redis 订阅推送到通过 websocket 连接的客户端。当客户端断开 websocket 时,我无法取消订阅和退出 redis go 例程。
受this post 的启发,这就是我目前所拥有的。我能够接收订阅事件并通过 websocket 向客户端发送消息,但是当客户端关闭 websocket 并且defer close(done) 代码触发时,我的case b, ok := <-done: 不会触发。默认情况下好像超载了???
package api
import (
...
"github.com/garyburd/redigo/redis"
"github.com/gorilla/websocket"
)
func wsHandler(w http.ResponseWriter, r *http.Request) {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
HandleError(w, err)
return
}
defer conn.Close()
done := make(chan bool)
defer close(done)
for {
var req WSRequest
err := conn.ReadJSON(&req)
if err != nil {
HandleWSError(conn, err)
return
}
defer conn.Close()
go func(done chan bool, req *WSRequest, conn *websocket.Conn) {
rc := redisPool.Get()
defer rc.Close()
psc := redis.PubSubConn{Conn: rc}
if err := psc.PSubscribe(req.chanName); err != nil {
HandleWSError(conn, err)
return
}
defer psc.PUnsubscribe()
for {
select {
case b, ok := <-done:
if !ok || b == true {
return
}
default:
switch v := psc.Receive().(type) {
case redis.PMessage:
err := handler(conn, req, v)
if err != nil {
HandleWSError(conn, err)
}
case redis.Subscription:
log.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Printf("error in redis subscription; err:\n%v\n", v)
HandleWSError(conn, v)
default:
// do nothing...
log.Printf("unknown redis subscription event type; %s\n", reflect.TypeOf(v))
}
}
}
}(done, &req, conn)
}
}
【问题讨论】:
-
为什么应用程序会获得新连接并订阅收到的每条消息?
-
也许我误解了
redis.PubSubConn的工作原理。每个客户都想订阅一个独特的频道。 -
我简化了我的实际代码,每个 websocket 客户端都会传递一个通道名称字符串。
-
请简要描述一下应用程序在做什么。描述发送到和从 websocket 客户端发送的消息,以及这些消息与 pubsub 消息和频道名称的关系。
-
每当某个表发生变化时,我的 postgres 数据库中的触发器都会发送通知。 go 中的侦听器获取该表更改事件并将其发布到 redis 上。通道名称的结构方式使其稍后可以由感兴趣的客户进行模式匹配。客户端通过 websocket 连接到服务器并发送描述他们感兴趣的表更改事件类型的有效负载。他们订阅的频道是基于此有效负载生成的。根据有效负载,客户端可以订阅无数个可能的频道名称。