【问题标题】:Golang Nats subscribe issueGolang Nats 订阅问题
【发布时间】:2018-10-02 13:35:37
【问题描述】:

我目前从事微服务架构的工作。 在我将 NATS 插入我的项目之前,我想用它测试一些简单的场景。

在一个场景中,我有一个简单的发布者,它在 localhost:4222 上运行的基本 Nats 服务器的 for 循环中发布 100.000 条消息。

最大的问题是订阅者。当他收到 30.000 - 40.000 条消息时,我的整个 main.go 程序和所有其他 go 例程都会停止并且什么也不做。我可以用 ctrl + c 退出。但是发布者仍在继续发送消息。当我打开一个新终端并启动一个新的订阅者实例时,一切都会再次正常工作,直到订阅者收到大约 30000 条消息。最糟糕的是,服务器上什至没有出现一个错误,也没有日志,所以我不知道发生了什么。

之后我尝试用 QueueSubscribe 方法替换订阅方法,一切正常。

Subscribe 和 QueueSubscribe 的主要区别是什么?

NATS-Streaming 是更好的机会吗?或者在哪些情况下我应该更喜欢 Streaming 以及标准 NATS-Server

这是我的代码:

出版商:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/go-nats"
)

func main() {
    go createPublisher()

    for {

    }
}

func createPublisher() {

    log.Println("pub started")

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    msg := make([]byte, 16)

    for i := 0; i < 100000; i++ {
        nc.Publish("alenSub", msg)
        if (i % 100) == 0 {
            fmt.Println("i", i)
        }
        time.Sleep(time.Millisecond)
    }

    log.Println("pub finish")

    nc.Flush()

}

订阅者:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/go-nats"
)

var received int64

func main() {
    received = 0

    go createSubscriber()
    go check()

    for {

    }
}

func createSubscriber() {

    log.Println("sub started")

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    nc.Subscribe("alenSub", func(msg *nats.Msg) {
        received++
    })
    nc.Flush()

    for {

    }
}

func check() {
    for {
        fmt.Println("-----------------------")
        fmt.Println("still running")
        fmt.Println("received", received)
        fmt.Println("-----------------------")
        time.Sleep(time.Second * 2)
    }
}

【问题讨论】:

  • PublishSubscribeFlush 都返回 error。也许添加错误检查并查看这些调用中的任何一个是否为您提供了可能有用的信息?另请注意,您在订阅者中的received 上存在数据竞争,因为您正在读取它并在单独的 goroutine 中写入它。考虑将其切换为 sync/atomic 或添加互斥体。
  • 在发布方面,我会使用Request 发送消息并等待确认回复。还要设置 NATS 连接最大重试次数以连接到-1(因此它总是重新连接)。
  • 友好的 Go 建议。 从不忽略返回的errorEVER
  • 安装 delve 调试器:github.com/derekparker/delve/tree/master/Documentation/… 当订阅者挂起时,使用 dlv 附加到进程并查看所有 goroutine 的堆栈跟踪。您应该能够准确地看到它挂在哪里,并获得一些关于哪里出了问题的线索。
  • 好的,我已经添加了 atomic 并捕获了来自发布者和订阅者的所有错误,但我仍然一无所获。它只是停止了,用 delve 我也找不到任何东西。这是我的实际订阅者代码play.golang.org/p/TjnVgrGfyv

标签: go message-queue microservices nats.io


【解决方案1】:

无限的for 循环可能会使垃圾收集器挨饿:https://github.com/golang/go/issues/15442#issuecomment-214965471

我能够通过运行发布者来重现该问题。要解决此问题,我建议使用sync.WaitGroup。以下是我如何更新链接到 cmets 中的代码以完成它:

package main

import (
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/nats-io/go-nats"
)

// create wait group
var wg sync.WaitGroup

func main() {
    // add 1 waiter
    wg.Add(1)
    go createPublisher()

    // wait for wait group to complete
    wg.Wait()
}

func createPublisher() {

    log.Println("pub started")
    // mark wait group done after createPublisher completes
    defer wg.Done()

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    msg := make([]byte, 16)

    for i := 0; i < 100000; i++ {
        if errPub := nc.Publish("alenSub", msg); errPub != nil {
            panic(errPub)
        }

        if (i % 100) == 0 {
            fmt.Println("i", i)
        }
        time.Sleep(time.Millisecond * 1)
    }

    log.Println("pub finish")

    errFlush := nc.Flush()
    if errFlush != nil {
        panic(errFlush)
    }

    errLast := nc.LastError()
    if errLast != nil {
        panic(errLast)
    }

}

我建议类似地更新上述订阅者代码。

SubscribeQueueSubscriber 之间的主要区别在于,在Subscribe 中,所有订阅者都会收到来自其的所有消息。在QueueSubscribe 中,每条消息仅发送QueueGroup 中的一个订阅者。

有关 NATS 流式传输的其他功能的一些详细信息如下: https://nats.io/documentation/streaming/nats-streaming-intro/

我们看到 NATS 和 NATS Streaming 都用于从数据管道到控制平面的各种用例。您的选择应该由您的用例需求驱动。

【讨论】:

  • 你是对的,无限 for 循环是个大问题。非常感谢我的一天:)
【解决方案2】:

如上所述,删除 for{} 循环。替换为 runtime.Goexit()。

对于订阅者,您无需在 Go 例程中创建订阅者。异步订阅者已经有自己的 Go 回调例程。

还使用原子或互斥锁保护接收到的变量。

请参阅此处的示例。

https://github.com/nats-io/go-nats/tree/master/examples

【讨论】:

  • 感谢 derek 提供的示例。我正在搜索有关如何使用 QueueSubscribe 方法开发它的示例。但是因为是异步操作,我们不知道何时调用它的回调,在我认为的示例中也是如此,因为它是异步的,所以回调函数何时起作用并不确定。我们应该使用通道实现非阻塞解决方案还是有更好的方法?非常感谢您与 nats 的出色合作!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-08-20
  • 1970-01-01
  • 2022-01-07
  • 2018-04-04
  • 2017-04-28
  • 1970-01-01
相关资源
最近更新 更多