【问题标题】:How to ensure redis subscriber receive message in Go (Golang)?如何确保 redis 订阅者在 Go (Golang) 中接收消息?
【发布时间】:2016-12-17 09:30:56
【问题描述】:

我正在使用 gin 框架来构建 API 服务器。一般来说,我正在构建 2 个项目。项目'API' 和项目'SOCKET'。项目'API' 是Android 中使用的主要REST API,使用gin 框架(golang)开发。而 Project 'SOCKET' 是客户端的套接字服务器,它将使用套接字连接,使用 node.js (Socket.IO)

这个过程是这样开始的:
用户A:作为请求者;连接到"API"
用户B:作为响应者; B 连接到"SOCKET"

User A从android调用APIrequestData,请求将由"API"的项目处理。而Project"API"会记录请求,并发布到redis上 作为 new_request 使用 pubsub

这是代码示例:

client := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379",
    Password: "", // no password set
    DB:       0,  // use default DB
})

pong, err := client.Ping().Result()

fmt.Println(pong, err)

 if err !=nil {
    fmt.Println("err",err);
 }


pubsub, err := client.Subscribe("responseclient")
if err !=nil {
    panic(err)
}
defer pubsub.Close()

err = client.Publish("new_request", "Example New Request").Err()

if err !=nil {
    panic(err)
}
msg, err :=pubsub.ReceiveMessage()
if err != nil {
    panic(err)
}

fmt.Println(msg.Channel, msg.Payload)

}

在项目"SOCKET" 中,有一个订阅者将监听发生的每个发布,并将新消息发布到频道responseclient 这是示例代码:

ioApp.on ('connection' , function(socket) {
redisSub.on('new_request', function (channel, message) {
    console.log(channel + ':' + message);

    redisPub.publish("responseclient", JSON.stringify(res));    

}); 

})

如果用户 B 已连接到 Socket.IO,则此工作顺利进行。但是如果用户B离线,或者没有连接到socket.io,这将等待很长时间,直到我们手动杀死或者直到用户B在线

我要求的是:

  1. 我们可以在 redis pub/sub 上创建类似 callback 的东西吗?如果订阅者由于离线或其他原因不接受消息,我们将关闭连接。这可能吗?
  2. 在 Node.Js 中我知道我可以使用超时功能,如果在特定时间没有收到消息,它将关闭订阅或发出任何事件,如何在 golang 上执行此操作?如果User B 处于活动状态或离线状态,我需要通知User A,以便他可以等待另一个时间来创建请求。
  3. 如果什么都做不到,您对我有什么建议?

我希望我的问题可以理解,并且可以很好地回答。
*可能有些代码,缺少变量。
** 我将这个库用于 golang redis:go-redis

【问题讨论】:

  • 对于第一个问题,答案是否定的:stackoverflow.com/questions/23675394/…。查看 antirez 的评论以获得可能的解决方案
  • 感谢您的回答。寻找 ACK 可能是我的另一个解决方案。因为我在等待第二个问题的答案呵呵:D

标签: go redis node-redis go-gin


【解决方案1】:

1) Redis 中没有回调。

2) 在 Go 中实现超时的常用方法是使用通道和选择 - 其中一个是您执行阻塞的通道,另一个通道在超时时接收消息。可以在herehere for the docs 找到这样的示例

现在对于 (3),您可以选择一些方法。第一种是使用列表,从一侧推送(发布)并从另一侧弹出(订阅)。对于接收器,您可以使用BRPOPBLPOP - 分别阻止来自右侧或左侧的流行音乐。您可以将两者结合起来以获得持久的消息传递。

现在 PUBSUB 的一部分还取决于您要发布到的内容。如果您要发布到一个会有订阅者的频道当且仅当有一个用户连接来接收它(因此只有一个订阅者到该频道),您可以检查来自的响应您的发布命令。它会告诉你它发布给了多少客户。如果频道仅由在线接收者订阅,则返回“1”,如果用户离线,则返回“0”。

第三个例子是将消息存储在一个有序集合中,以时间戳作为分数。这将允许接收者连接并从上次连接时获取消息 - 但这假设在某处存在一些持久性 - 通常是客户端。您还需要对排序集进行一些清理活动。

在这种情况下要考虑的其他一些事情是您最终是否使用复制,在这种情况下,您必须明确考虑故障转移 - 尽管在您描述的情况下,您确实需要考虑断开连接和重新连接。 my post on reliable PUBSUB 有具体的例子。

【讨论】:

  • 感谢您的回答。对我来说最快的方法是在频道上选择。
【解决方案2】:
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/go-redis/redis/v8"
)

var ctx = context.Background()

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "", // no password set
        DB:       0,  // use default DB
    })

    subscribe := rdb.Subscribe(ctx, "hello")
    subscriptions := subscribe.ChannelWithSubscriptions(ctx, 1)
    go func() {
        var sentCount = 0
        for  {
            rdb.Publish(ctx,"hello",time.Now().UnixNano())
            sentCount++
            if sentCount >300{
                break
            }
        }
    }()
    for  {
        select {
        case sub:=<-subscriptions:
            fmt.Println(sub)
        }
    }



}

【讨论】:

    猜你喜欢
    • 2020-12-14
    • 1970-01-01
    • 2021-10-20
    • 1970-01-01
    • 2019-06-23
    • 1970-01-01
    • 2010-11-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多