【问题标题】:How to deal with ZMQ sockets lack of thread safety?如何处理 ZMQ 套接字缺乏线程安全?
【发布时间】:2016-07-26 01:39:43
【问题描述】:

我在一些 Python 应用程序中使用 ZMQ 已经有一段时间了,但直到最近我才决定在 Go 中重新实现其中一个,我意识到 ZMQ 套接字不是线程安全的。

最初的 Python 实现使用如下所示的事件循环:

while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))

    for req in requests:
        rep = handle_request(req)
        if rep:
            replies.append(rep)
            requests.remove(req)

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b'', zmq.SNDMORE)
        router.send(data)
        del replies[:]

问题是第一次通过时可能没有准备好回复,所以每当我有待处理的请求时,我必须以非常短的超时时间进行轮询,否则客户端将等待超过他们应该等待的时间,并且应用程序结束使用大量 CPU 进行轮询。

当我决定在 Go 中重新实现它时,我认为它会像这样简单,通过在轮询中使用无限超时来避免问题:

for {
    sockets, _ := poller.Poll(-1) 
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case router:
            msg, _ := s.RecvMessage(0)
            client_id := msg[0]
            data := msg[2]
            go handleRequest(router, client_id, data)                
        }
    }
}

但这种理想的实现只有在我连接了一个客户端或负载较轻的情况下才有效。在重负载下,我在 libzmq 中得到随机断言错误。我尝试了以下方法:

  1. 按照zmq4 docs,我尝试在所有套接字操作上添加一个sync.Mutex 和锁定/解锁。它失败。我认为这是因为 ZMQ 使用自己的线程进行刷新。

  2. 创建一个用于轮询/接收和一个用于发送的 goroutine,并以与我在 Python 版本中使用 req/rep 队列相同的方式使用通道。它失败了,因为我仍在共享套接字。

  3. 与 2 相同,但设置为 GOMAXPROCS=1。它失败了,而且吞吐量非常有限,因为回复被推迟到Poll() 调用返回。

  4. 使用如 2 中的 req/rep 通道,但使用 runtime.LockOSThread 将所有套接字操作保持在与套接字相同的线程中。有和上面一样的问题。它没有失败,但吞吐量非常有限。

  5. 与 4 相同,但使用 Python 版本的轮询超时策略。它可以工作,但与 Python 版本有同样的问题。

  6. 共享上下文而不是套接字,并在单独的 goroutine 中创建一个用于发送的套接字和一个用于接收的套接字,与通道通信。它可以工作,但我必须重写客户端库以使用两个套接字而不是一个。

  7. 摆脱 zmq 并使用原始 TCP 套接字,它们是线程安全的。它工作得很好,但我还必须重写客户端库。

所以,看起来 6 是 ZMQ 真正打算使用的方式,因为这是我让它与 goroutines 无缝工作的唯一方法,但我想知道是否还有其他方法我没有尝试过。有什么想法吗?


更新

有了这里的答案,我意识到我可以将一个inproc PULL 套接字添加到轮询器,并让一个 goroutine 连接并推送一个字节来摆脱无限等待。它不像这里建议的解决方案那样通用,但它可以工作,我什至可以将它反向移植到 Python 版本。

【问题讨论】:

  • 你在 Go 端使用什么 ZMQ 库?
  • @sberry github.com/pebbe/zmq4,链接在#1
  • 我是 Go 世界的新手 - 除了一致性之外,还有其他原因会使用 inproc 套接字而不是使用 ZMQ 进行外部通信和使用 Go 通道进行内部通信的混合系统吗?跨度>
  • @Jason 因为我不能将通道与 ZMQ 轮询器一起使用。如果我可以将 ZMQ 套接字与 Go 的选择一起使用,或者将通道与 ZMQ 轮询器一起使用,那就太好了。

标签: sockets go zeromq


【解决方案1】:

使用 pebbe/zmq4 实现此目的的幸运方法是使用 Reactor。 Reactors 能够监听 Go 通道,但您不想这样做,因为它们通过使用轮询超时定期轮询通道来做到这一点,这会重新引入您在您的蟒蛇版本。相反,您可以使用 zmq inproc 套接字,一端由反应器持有,另一端由从通道传递数据的 goroutine 持有。复杂,冗长,令人不快,但我已经成功使用了。

【讨论】:

  • 我明白了,但我刚刚意识到我可以简单地使用inproc 套接字来中断poller.Poll(-1) 调用,我什至不必使用反应器。谢谢。
  • @PedroWerneck 聪明。很高兴我能以某种方式提供帮助。
【解决方案2】:

opened an issuea 1.5 年前将https://github.com/vaughan0/go-zmq/blob/master/channels.go 的端口引入pebbe/zmq4。最终作者决定反对它,但我们已经在生产中使用它(在非常繁重的工作负载下)很长时间了。

这是必须添加到 pebbe/zmq4 包中的文件的gist(因为它向 Socket 添加了方法)。这可以重写为 Socket 接收器上的方法将 Socket 作为参数,但由于我们无论如何都提供我们的代码,这是一个简单的方法。

基本用法是像平常一样创建您的Socket(例如称为s)然后您可以:

channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()

现在您有两个 [][]byte 类型的通道,您可以在 goroutine 之间使用它们,但是一个 goroutine - 在通道抽象中管理,负责管理 Poller 并与套接字通信。

【讨论】:

  • 忘记我之前的评论。我现在明白了。本质上,它的作用是使用inproc 套接字打破无限超时并将所有内容包装在通道中以使其更加无缝。好东西。谢谢。
猜你喜欢
  • 2021-06-20
  • 2011-03-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-11-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多