【发布时间】:2013-09-03 04:53:54
【问题描述】:
我有一个应用程序,其中每个 websocket 连接(在龙卷风打开回调中)都会为现有的 zmq.FORWARDER 设备创建一个 zmq.SUB 套接字。想法是从 zmq 接收数据作为回调,然后可以通过 websocket 连接中继到前端客户端。
https://gist.github.com/abhinavsingh/6378134
ws.py
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
from tornado.websocket import WebSocketHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
ioloop = IOLoop.instance()
class ZMQPubSub(object):
def __init__(self, callback):
self.callback = callback
def connect(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect('tcp://127.0.0.1:5560')
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.callback)
def subscribe(self, channel_id):
self.socket.setsockopt(zmq.SUBSCRIBE, channel_id)
class MyWebSocket(WebSocketHandler):
def open(self):
self.pubsub = ZMQPubSub(self.on_data)
self.pubsub.connect()
self.pubsub.subscribe("session_id")
print 'ws opened'
def on_message(self, message):
print message
def on_close(self):
print 'ws closed'
def on_data(self, data):
print data
def main():
application = Application([(r'/channel', MyWebSocket)])
application.listen(10001)
print 'starting ws on port 10001'
ioloop.start()
if __name__ == '__main__':
main()
forwarder.py
import zmq
def main():
try:
context = zmq.Context(1)
frontend = context.socket(zmq.SUB)
frontend.bind('tcp://*:5559')
frontend.setsockopt(zmq.SUBSCRIBE, '')
backend = context.socket(zmq.PUB)
backend.bind('tcp://*:5560')
print 'starting zmq forwarder'
zmq.device(zmq.FORWARDER, frontend, backend)
except KeyboardInterrupt:
pass
except Exception as e:
logger.exception(e)
finally:
frontend.close()
backend.close()
context.term()
if __name__ == '__main__':
main()
publish.py
import zmq
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect('tcp://127.0.0.1:5559')
socket.send('session_id helloworld')
print 'sent data for channel session_id'
但是,我的 ZMQPubSub 类似乎根本没有接收任何数据。
我进一步实验并意识到我需要在ZMQPubSub中注册on_recv回调后调用ioloop.IOLoop.instance().start()。但是,这只会阻止执行。
我也尝试将 main.ioloop 实例传递给 ZMQStream 构造函数,但也无济于事。
有没有一种方法可以将ZMQStream 绑定到现有的main.ioloop 实例而不阻塞MyWebSocket.open 内的流?
【问题讨论】:
-
您使用的是什么 pyzmq 和 tornado 版本?我刚刚用两者的当前主人测试了你的代码,并且 PubSub 对象确实在接收消息。
-
tornado==3.1pyzmq==13.1.0 -
@minrk 我已经更新了示例代码,以反映我最终正在尝试的内容。您能否让这个示例代码在您的终端运行并在
on_data回调中接收数据? -
您的发布者真的只是尽可能快地绑定/发送/退出吗?因为慢订阅者永远不会收到该消息,因为在发送第一条消息之前订阅不会传播。您可以在
publisher.send之前添加一个睡眠,您的消息应该会到达。 -
@minrk 感谢您的睡眠提示,这肯定有帮助。是的,我的应用程序中的发布者将主要连接/发送/退出。但是,这种睡眠在这里真的是一件不受欢迎的事情。有没有办法确保可靠地传递消息而不涉及任何睡眠?