【发布时间】:2016-07-26 01:39:43
【问题描述】:
我在一些 Python 应用程序中使用 ZMQ 已经有一段时间了,但直到最近我才决定在 Go 中重新实现其中一个,我意识到 ZMQ 套接字不是线程安全的。
最初的 Python 实现使用如下所示的事件循环:
while running:
socks = dict(poller.poll(TIMEOUT))
if socks.get(router) == zmq.POLLIN:
client_id = router.recv()
_ = router.recv()
data = router.recv()
requests.append((client_id, data))
for req in requests:
rep = handle_request(req)
if rep:
replies.append(rep)
requests.remove(req)
for client_id, data in replies:
router.send(client_id, zmq.SNDMORE)
router.send(b'', zmq.SNDMORE)
router.send(data)
del replies[:]
问题是第一次通过时可能没有准备好回复,所以每当我有待处理的请求时,我必须以非常短的超时时间进行轮询,否则客户端将等待超过他们应该等待的时间,并且应用程序结束使用大量 CPU 进行轮询。
当我决定在 Go 中重新实现它时,我认为它会像这样简单,通过在轮询中使用无限超时来避免问题:
for {
sockets, _ := poller.Poll(-1)
for _, socket := range sockets {
switch s := socket.Socket; s {
case router:
msg, _ := s.RecvMessage(0)
client_id := msg[0]
data := msg[2]
go handleRequest(router, client_id, data)
}
}
}
但这种理想的实现只有在我连接了一个客户端或负载较轻的情况下才有效。在重负载下,我在 libzmq 中得到随机断言错误。我尝试了以下方法:
按照zmq4 docs,我尝试在所有套接字操作上添加一个sync.Mutex 和锁定/解锁。它失败。我认为这是因为 ZMQ 使用自己的线程进行刷新。
创建一个用于轮询/接收和一个用于发送的 goroutine,并以与我在 Python 版本中使用 req/rep 队列相同的方式使用通道。它失败了,因为我仍在共享套接字。
与 2 相同,但设置为
GOMAXPROCS=1。它失败了,而且吞吐量非常有限,因为回复被推迟到Poll()调用返回。使用如 2 中的 req/rep 通道,但使用
runtime.LockOSThread将所有套接字操作保持在与套接字相同的线程中。有和上面一样的问题。它没有失败,但吞吐量非常有限。与 4 相同,但使用 Python 版本的轮询超时策略。它可以工作,但与 Python 版本有同样的问题。
共享上下文而不是套接字,并在单独的 goroutine 中创建一个用于发送的套接字和一个用于接收的套接字,与通道通信。它可以工作,但我必须重写客户端库以使用两个套接字而不是一个。
摆脱 zmq 并使用原始 TCP 套接字,它们是线程安全的。它工作得很好,但我还必须重写客户端库。
所以,看起来 6 是 ZMQ 真正打算使用的方式,因为这是我让它与 goroutines 无缝工作的唯一方法,但我想知道是否还有其他方法我没有尝试过。有什么想法吗?
更新
有了这里的答案,我意识到我可以将一个inproc PULL 套接字添加到轮询器,并让一个 goroutine 连接并推送一个字节来摆脱无限等待。它不像这里建议的解决方案那样通用,但它可以工作,我什至可以将它反向移植到 Python 版本。
【问题讨论】:
-
你在 Go 端使用什么 ZMQ 库?
-
@sberry github.com/pebbe/zmq4,链接在#1
-
我是 Go 世界的新手 - 除了一致性之外,还有其他原因会使用 inproc 套接字而不是使用 ZMQ 进行外部通信和使用 Go 通道进行内部通信的混合系统吗?跨度>
-
@Jason 因为我不能将通道与 ZMQ 轮询器一起使用。如果我可以将 ZMQ 套接字与 Go 的选择一起使用,或者将通道与 ZMQ 轮询器一起使用,那就太好了。