【问题标题】:Closing a redis subscription and ending the go routine when websocket connection closes当 websocket 连接关闭时关闭 redis 订阅并结束 go 例程
【发布时间】:2018-01-14 05:58:39
【问题描述】:

我正在将事件从 redis 订阅推送到通过 websocket 连接的客户端。当客户端断开 websocket 时,我无法取消订阅和退出 redis go 例程。

this post 的启发,这就是我目前所拥有的。我能够接收订阅事件并通过 websocket 向客户端发送消息,但是当客户端关闭 websocket 并且defer close(done) 代码触发时,我的case b, ok := <-done: 不会触发。默认情况下好像超载了???

package api

import (
    ...

    "github.com/garyburd/redigo/redis"
    "github.com/gorilla/websocket"
)

func wsHandler(w http.ResponseWriter, r *http.Request) {
    var upgrader = websocket.Upgrader{
        ReadBufferSize:  1024,
        WriteBufferSize: 1024,
    }

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        HandleError(w, err)
        return
    }
    defer conn.Close()

    done := make(chan bool)
    defer close(done)

    for {
        var req WSRequest
        err := conn.ReadJSON(&req)
        if err != nil {
            HandleWSError(conn, err)
            return
        }
        defer conn.Close()

        go func(done chan bool, req *WSRequest, conn *websocket.Conn) {
            rc := redisPool.Get()
            defer rc.Close()

            psc := redis.PubSubConn{Conn: rc}
            if err := psc.PSubscribe(req.chanName); err != nil {
                HandleWSError(conn, err)
                return
            }
            defer psc.PUnsubscribe()

            for {
                select {
                case b, ok := <-done:
                    if !ok || b == true {
                        return
                    }
                default:
                    switch v := psc.Receive().(type) {
                    case redis.PMessage:
                        err := handler(conn, req, v)
                        if err != nil {
                            HandleWSError(conn, err)
                        }

                    case redis.Subscription:
                         log.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)

                    case error:
                         log.Printf("error in redis subscription; err:\n%v\n", v)
                         HandleWSError(conn, v)

                    default:
                        // do nothing...
                        log.Printf("unknown redis subscription event type; %s\n", reflect.TypeOf(v))
                    }
                }
            }
        }(done, &req, conn)
    }
}

【问题讨论】:

  • 为什么应用程序会获得新连接并订阅收到的每条消息?
  • 也许我误解了redis.PubSubConn 的工作原理。每个客户都想订阅一个独特的频道。
  • 我简化了我的实际代码,每个 websocket 客户端都会传递一个通道名称字符串。
  • 请简要描述一下应用程序在做什么。描述发送到和从 websocket 客户端发送的消息,以及这些消息与 pubsub 消息和频道名称的关系。
  • 每当某个表发生变化时,我的 postgres 数据库中的触发器都会发送通知。 go 中的侦听器获取该表更改事件并将其发布到 redis 上。通道名称的结构方式使其稍后可以由感兴趣的客户进行模式匹配。客户端通过 websocket 连接到服务器并发送描述他们感兴趣的表更改事件类型的有效负载。他们订阅的频道是基于此有效负载生成的。根据有效负载,客户端可以订阅无数个可能的频道名称。

标签: go redis


【解决方案1】:

在为 websocket 连接提供服务后,进行这些更改以跳出读取循环:

  • 维护为此 websocket 连接创建的 Redis 连接的一部分。

  • 完成后取消订阅所有连接。

  • 修改读取循环以在订阅计数为零时返回。

代码如下:

func wsHandler(w http.ResponseWriter, r *http.Request) {
    var upgrader = websocket.Upgrader{
        ReadBufferSize:  1024,
        WriteBufferSize: 1024,
    }

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        HandleError(w, err)
        return
    }
    defer conn.Close()

    // Keep slice of all connections. Unsubscribe all connections on exit.
    var pscs []redis.PubSubConn
    defer func() {
        for _, psc := range rcs {
           psc.Unsubscribe() // unsubscribe with no args unsubs all channels
        }
    }()

    for {
        var req WSRequest
        err := conn.ReadJSON(&req)
        if err != nil {
            HandleWSError(conn, err)
            return
        }

        rc := redisPool.Get()
        psc := redis.PubSubConn{Conn: rc}
        pscs = append(pscs, psc)

        if err := psc.PSubscribe(req.chanName); err != nil {
            HandleWSError(conn, err)
            return
        }

        go func(req *WSRequest, conn *websocket.Conn) {
            defer rc.Close()
            for {
                switch v := psc.Receive().(type) {
                case redis.PMessage:
                    err := handler(conn, req, v)
                    if err != nil {
                        HandleWSError(conn, err)
                    }

                case redis.Subscription:
                     log.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
                     if v.Count == 0 {
                         return
                     }

                case error:
                     log.Printf("error in redis subscription; err:\n%v\n", v)
                     HandleWSError(conn, v)

                default:
                    // do nothing...
                    log.Printf("unknown redis subscription event type; %s\n", reflect.TypeOf(v))
                }
            }
        }(&req, conn)
    }
}

问题中的代码和这个答案为每个 websocket 客户端拨打多个 Redis 连接。一种更典型且可扩展的方法是跨多个客户端共享单个 Redis pubsub 连接。考虑到高级描述,典型方法可能适合您的应用程序,但鉴于问题中的代码,我仍然不确定您要做什么。

【讨论】:

  • 谢谢!光看代码,肯定会完成退订。我会试一试。 “一种更典型且可扩展的方法是在多个客户端之间共享单个 Redis pubsub 连接。”是否可以在多个 websocket 连接之间共享一个 pubsub 连接,然后让每个 websocket 订阅一个唯一的频道并且只接收来自该频道的消息?我不希望 websocket 从它不感兴趣的频道接收消息。
  • 您需要编写几行代码来解复用 pubsub 消息以共享单个 redis 连接。朝着正确方向迈出的一小步是为每个客户端使用单个 Redis 连接。这可能是一个简单的更改,但我不确定,因为我不明白您要做什么(高级描述和代码之间存在很大差距)。如果一个客户端只能订阅一个redis频道,那么改变就很简单了。
  • "您需要编写几行代码来解复用 pubsub 消息以共享单个 redis 连接。"想象一下,如果我每秒有 10k 个 websocket 连接和 1k 个表更改事件。然后我需要每秒进行 10MM 消息多路分解。我可以在没有 redis 的情况下为每个 websocket 客户端解复用消息,只需使用 postgres 通知/侦听。我选择添加 redis 以将解复用的负担从我的服务器转移到 redis 上。
  • 您需要每秒解复用 1k 个事件,而不是 10mm。将数据发送到 Redis 并返回不会从前端服务器卸载工作,因为与发送或接收 Redis 消息的成本相比,解复用事件(它基本上是地图查找)的成本很小。
猜你喜欢
  • 1970-01-01
  • 2016-07-22
  • 2017-05-30
  • 2019-09-07
  • 2021-10-30
  • 1970-01-01
  • 2018-11-26
  • 2012-02-21
  • 2022-11-07
相关资源
最近更新 更多