【发布时间】:2018-10-02 13:35:37
【问题描述】:
我目前从事微服务架构的工作。 在我将 NATS 插入我的项目之前,我想用它测试一些简单的场景。
在一个场景中,我有一个简单的发布者,它在 localhost:4222 上运行的基本 Nats 服务器的 for 循环中发布 100.000 条消息。
最大的问题是订阅者。当他收到 30.000 - 40.000 条消息时,我的整个 main.go 程序和所有其他 go 例程都会停止并且什么也不做。我可以用 ctrl + c 退出。但是发布者仍在继续发送消息。当我打开一个新终端并启动一个新的订阅者实例时,一切都会再次正常工作,直到订阅者收到大约 30000 条消息。最糟糕的是,服务器上什至没有出现一个错误,也没有日志,所以我不知道发生了什么。
之后我尝试用 QueueSubscribe 方法替换订阅方法,一切正常。
Subscribe 和 QueueSubscribe 的主要区别是什么?
NATS-Streaming 是更好的机会吗?或者在哪些情况下我应该更喜欢 Streaming 以及标准 NATS-Server
这是我的代码:
出版商:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/go-nats"
)
func main() {
go createPublisher()
for {
}
}
func createPublisher() {
log.Println("pub started")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
msg := make([]byte, 16)
for i := 0; i < 100000; i++ {
nc.Publish("alenSub", msg)
if (i % 100) == 0 {
fmt.Println("i", i)
}
time.Sleep(time.Millisecond)
}
log.Println("pub finish")
nc.Flush()
}
订阅者:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/go-nats"
)
var received int64
func main() {
received = 0
go createSubscriber()
go check()
for {
}
}
func createSubscriber() {
log.Println("sub started")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
nc.Subscribe("alenSub", func(msg *nats.Msg) {
received++
})
nc.Flush()
for {
}
}
func check() {
for {
fmt.Println("-----------------------")
fmt.Println("still running")
fmt.Println("received", received)
fmt.Println("-----------------------")
time.Sleep(time.Second * 2)
}
}
【问题讨论】:
-
Publish、Subscribe和Flush都返回error。也许添加错误检查并查看这些调用中的任何一个是否为您提供了可能有用的信息?另请注意,您在订阅者中的received上存在数据竞争,因为您正在读取它并在单独的 goroutine 中写入它。考虑将其切换为sync/atomic或添加互斥体。 -
在发布方面,我会使用
Request发送消息并等待确认回复。还要设置 NATS 连接最大重试次数以连接到-1(因此它总是重新连接)。 -
友好的 Go 建议。 从不忽略返回的
error,EVER。 -
安装 delve 调试器:github.com/derekparker/delve/tree/master/Documentation/… 当订阅者挂起时,使用 dlv 附加到进程并查看所有 goroutine 的堆栈跟踪。您应该能够准确地看到它挂在哪里,并获得一些关于哪里出了问题的线索。
-
好的,我已经添加了 atomic 并捕获了来自发布者和订阅者的所有错误,但我仍然一无所获。它只是停止了,用 delve 我也找不到任何东西。这是我的实际订阅者代码play.golang.org/p/TjnVgrGfyv
标签: go message-queue microservices nats.io