【问题标题】:Get subscriber filter from a ZMQ PUB socket从 ZMQ PUB 套接字获取订阅者过滤器
【发布时间】:2014-03-05 08:06:54
【问题描述】:

我在Monitoring section 的常见问题解答中注意到,无法获取已连接对等点的列表或在对等点连接/断开连接时收到通知。

这是否意味着也无法从上游反馈中知道 PUB/XPUB 套接字知道它应该发布哪些主题?或者有什么方法可以访问这些数据?

我知道 ZMQ >= 3.0 "supports PUB/SUB filtering at the publisher",但我真正想要的是过滤我的应用程序代码,使用 ZMQ 关于订阅哪些主题的知识。

我的用例是我想发布有关机器人状态的信息。一些主题涉及主要的硬件操作,例如切换 ADC 上的选择线以读取 IR 值。

我有一个在机器人上运行的发布者线程,它应该只在实际有订阅者时执行“读取”以获取 IR 数据。然而,由于我只能将一个字符串输入到我的 pub_sock.send 中,所以我总是不得不进行代价高昂的操作,即使 ZMQ 即将在没有订阅者的情况下丢弃该消息。

我有一个使用反向通道 REQ/REP 套接字发送主题信息的实现,我的应用可以在其发布循环中检查这些信息,从而只收集需要收集的数据。但这似乎很不优雅,因为 ZMQ 必须已经拥有我需要的数据,正如它在发布者处的过滤所证明的那样。

我注意到在这个mailing list message 中,OP 似乎能够看到发送到 XPUB 套接字的订阅消息。

但是,没有提及他们是如何做到的,而且我在文档中也没有看到任何这样的能力(仍在寻找)。也许他们只是在使用 Wireshark(查看上游订阅消息到 XPUB 套接字)。

【问题讨论】:

  • 我已经在#zeromq IRC 频道上发帖两次询问这个问题,有 6 小时的偏移量来帮助处理时区,但到目前为止没有收到任何回复。
  • 这仍然是一个悬而未决的问题,我正在积极寻求答案。
  • 您找到答案了吗?我还需要我的 PUB 服务器知道订阅了哪些过滤器。服务器不需要创建客户端不感兴趣的数据。(例如:假设 SUB 客户端只订阅纽约的天气数据,那么 PUB 服务器不应该为世界上所有其他城市创建数据,只是为了扔掉它。)
  • 听起来是同一个用例。您是否尝试过以下 frans 和 Freek Wiekmeijer 的答案?我不再活跃在相关代码库中,我们只是保留了反向通道黑客。
  • 谢谢。我最终在单独的 PUSH/PULL 套接字上手动重新发送订阅信息(从客户端到服务器)。这感觉有点像黑客,但它工作正常。这个单独的通道也可以用作来自客户端的心跳。客户会不时重新发送他们的订阅请求。然后,服务器可以简单地停止编译和发送在最后一分钟左右没有客户端重新订阅的主题。

标签: sockets zeromq publish-subscribe


【解决方案1】:

使用zmq.XPUB 套接字类型,有一种方法可以检测新订户和离开订户。以下代码示例显示了如何:

# Publisher side
import zmq

ctx = zmq.Context.instance()
xpub_socket = ctx.socket(zmq.XPUB)
xpub_socket.bind("tcp://*:%d" % port_nr)
poller = zmq.Poller()
poller.register(xpub_socket)

events = dict(poller.poll(1000))
if xpub_socket in events:
    msg = xpub_socket.recv()
    if msg[0] == b'\x01':
        topic = msg[1:]
        print "Topic '%s': new subscriber" % topic
    elif msg[0] == b'\x00':
        topic = msg[1:]
        print "Topic '%s': subscriber left" % topic

请注意,zmq.XSUB 套接字类型的订阅方式与“普通”zmq.SUB 不同。代码示例:

# Subscriber side
import zmq
ctx = zmq.Context.instance()

# Subscribing of zmq.SUB socket
sub_socket = ctx.socket(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # OK
sub_socket.connect("tcp://localhost:%d" % port_nr)

# Subscribing zmq.XSUB socket
xsub_socket = ctx.socket(zmq.XSUB)
xsub_socket.connect("tcp://localhost:%d" % port_nr)
# xsub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # NOK, raises zmq.error.ZMQError: Invalid argument
xsub_socket.send_multipart([b'\x01', b'sometopic']) # OK, triggers the subscribe event on the publisher

我还想指出zmq.XPUB_VERBOSE 套接字选项。如果设置,则在套接字上接收所有订阅事件。如果未设置,则会过滤重复订阅。另请参阅以下帖子:ZMQ: No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)

【讨论】:

    【解决方案2】:

    至少对于 XPUB/XSUB 套接字情况,您可以通过手动转发和处理包来保存订阅状态:

    context = zmq.Context()
    
    xsub_socket = context.socket(zmq.XSUB)
    xsub_socket.bind('tcp://*:10000')
    xpub_socket = context.socket(zmq.XPUB)
    xpub_socket.bind('tcp://*:10001')
    
    poller = zmq.Poller()
    poller.register(xpub_socket, zmq.POLLIN)
    poller.register(xsub_socket, zmq.POLLIN)
    
    while True:
        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            break
    
        if xpub_socket in events:
            message = xpub_socket.recv_multipart()
    
            # HERE goes some subscription handle code which inspects
            # message
    
            xsub_socket.send_multipart(message)
        if xsub_socket in events:
            message = xsub_socket.recv_multipart()
            xpub_socket.send_multipart(message)
    

    (这是 Python 代码,但我猜 C/C++ 看起来很相似)

    我目前正在研究这个主题,我会尽快添加更多信息。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-03-23
      • 2016-12-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多