【问题标题】:Attaching ZMQStream with existing tornado ioloop将 ZMQStream 与现有的龙卷风 ioloop 连接
【发布时间】:2013-09-03 04:53:54
【问题描述】:

我有一个应用程序,其中每个 websocket 连接(在龙卷风打开回调中)都会为现有的 zmq.FORWARDER 设备创建一个 zmq.SUB 套接字。想法是从 zmq 接收数据作为回调,然后可以通过 websocket 连接中继到前端客户端。

https://gist.github.com/abhinavsingh/6378134

ws.py

import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()

from tornado.websocket import WebSocketHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
ioloop = IOLoop.instance()

class ZMQPubSub(object):

    def __init__(self, callback):
        self.callback = callback

    def connect(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.SUB)
        self.socket.connect('tcp://127.0.0.1:5560')
        self.stream = ZMQStream(self.socket)
        self.stream.on_recv(self.callback)

    def subscribe(self, channel_id):
        self.socket.setsockopt(zmq.SUBSCRIBE, channel_id)

class MyWebSocket(WebSocketHandler):

    def open(self):
        self.pubsub = ZMQPubSub(self.on_data)
        self.pubsub.connect()
        self.pubsub.subscribe("session_id")
        print 'ws opened'

    def on_message(self, message):
        print message

    def on_close(self):
        print 'ws closed'

    def on_data(self, data):
        print data

def main():
    application = Application([(r'/channel', MyWebSocket)])
    application.listen(10001)
    print 'starting ws on port 10001'
    ioloop.start()

if __name__ == '__main__':
    main()

forwarder.py

import zmq

def main():
    try:
        context = zmq.Context(1)

        frontend = context.socket(zmq.SUB)
        frontend.bind('tcp://*:5559')
        frontend.setsockopt(zmq.SUBSCRIBE, '')

        backend = context.socket(zmq.PUB)
        backend.bind('tcp://*:5560')

        print 'starting zmq forwarder'
        zmq.device(zmq.FORWARDER, frontend, backend)
    except KeyboardInterrupt:
        pass
    except Exception as e:
        logger.exception(e)
    finally:
        frontend.close()
        backend.close()
        context.term()

if __name__ == '__main__':
    main()

publish.py

import zmq

if __name__ == '__main__':
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect('tcp://127.0.0.1:5559')
    socket.send('session_id helloworld')
    print 'sent data for channel session_id'

但是,我的 ZMQPubSub 类似乎根本没有接收任何数据。

我进一步实验并意识到我需要在ZMQPubSub中注册on_recv回调后调用ioloop.IOLoop.instance().start()。但是,这只会阻止执行。

我也尝试将 main.ioloop 实例传递给 ZMQStream 构造函数,但也无济于事。

有没有一种方法可以将ZMQStream 绑定到现有的main.ioloop 实例而不阻塞MyWebSocket.open 内的流?

【问题讨论】:

  • 您使用的是什么 pyzmq 和 tornado 版本?我刚刚用两者的当前主人测试了你的代码,并且 PubSub 对象确实在接收消息。
  • tornado==3.1 pyzmq==13.1.0
  • @minrk 我已经更新了示例代码,以反映我最终正在尝试的内容。您能否让这个示例代码在您的终端运行并在on_data 回调中接收数据?
  • 您的发布者真的只是尽可能快地绑定/发送/退出吗?因为慢订阅者永远不会收到该消息,因为在发送第一条消息之前订阅不会传播。您可以在publisher.send 之前添加一个睡眠,您的消息应该会到达。
  • @minrk 感谢您的睡眠提示,这肯定有帮助。是的,我的应用程序中的发布者将主要连接/发送/退出。但是,这种睡眠在这里真的是一件不受欢迎的事情。有没有办法确保可靠地传递消息而不涉及任何睡眠?

标签: python tornado zeromq


【解决方案1】:

在您现在完整的示例中,只需将转发器中的 frontend 更改为 PULL 套接字,将发布者套接字更改为 PUSH,它的行为应该与您预期的一样。

与此处相关的套接字选择的一般原则:

  • 当您想向所有准备好接收消息的人(可能没有人)发送消息时使用 PUB/SUB
  • 当您想向一个对等点发送消息时使用 PUSH/PULL,等待他们准备好

最初您可能只想要 PUB-SUB,但是一旦您开始查看每个套接字对,您就会意识到它们是非常不同的。 frontend-websocket 连接绝对是 PUB-SUB - 您可能有零对多的接收器,并且您只想将消息发送给在消息通过时碰巧可用的每个人。但后端不同 - 只有一个接收者,它肯定想要来自发布者的每条消息。

所以你有它 - 后端应该是 PULL 和前端 PUB。你所有的插座:

PUSH -> [PULL-PUB] -> SUB

publisher.py:socket是PUSH,连接到device.py中的backend

forwarder.py:backendPULLfrontendPUB ws.py: SUB 连接并订阅forwarder.frontend

在您的情况下,导致 PUB/SUB 在后端失败的相关行为是 slow joiner syndrome,即 described in The Guide。本质上,订阅者需要有限的时间来告诉发布者有订阅,所以如果你在打开 PUB 套接字后立即发送消息,很可能还没有被告知它有任何订​​阅者,所以它只是丢弃消息。

【讨论】:

  • 我在上一个回复中的解释可能不太清楚。我看到您提议用流媒体(推/拉)设备替换转发器(发布/订阅)(如果错了,请纠正我)。但是,我的用例将需要一个基于 pub/sub 的解决方案。每个 websocket 连接默认订阅至少 3 个频道 a) all_channels b) ${session_id}_channels c) ${session_id}_channel_${tab_id}。在实践中,c) 可以使用 push/pull,但 a) 和 b) 都需要 pub/sub 策略。有意义吗?
  • 不,我不建议您将其替换为 PUSH/PULL 设备 - 仅将设备中的 SUB 替换为 PULL,将发布者服务替换为 PUSH - 这样,整个链是: PUSH -> PULL-PUB -> SUB-websocket。您有一个 PULL-PUB 设备,而不是 PULL-PUSH 或 SUB-PUB。
【解决方案2】:

ZeroMq 订阅者必须订阅他们希望接收的消息;我在你的代码中没有看到。我相信 Python 的方式是这样的:

self.socket.setsockopt(zmq.SUBSCRIBE, "")

【讨论】:

  • 我忘了在上面的问题中添加那个位。 Websocket 连接确实订阅了一组 channel_id。我已经更新了上面的代码以反映相同的情况。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-10-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-08-29
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多