【问题标题】:How can a pyzmq SUB subscriber detect an offline PUB publisher?pyzmq SUB 订阅者如何检测离线 PUB 发布者?
【发布时间】:2016-03-02 11:11:39
【问题描述】:

SUB-scriber 怎么能真正确定,另一边有一个 PUB-lisher,否则根本无法启动? p>

详细信息:
虽然一切都可以使用我的 SUB-scriber 代码和正在运行的远程 PUB-lisher,当我.connect()+订阅我的客户端到一个虚拟服务器说localhost时,它没有注意到没有运行PUB-lishers,它只是启动并等待。

我使用标准程序来做:

    sock = context.socket(zmq.SUB)
    sock.connect("tcp://{}:{}".format(host, port))
    topic_filter = 'blah'
    sock.setsockopt_string(zmq.SUBSCRIBE, topic_filter)
    # here should come something that warns about offline publisher...

【问题讨论】:

    标签: python zeromq publish-subscribe pyzmq


    【解决方案1】:

    Sole PUB/SUB 无法创建这个

    ZeroMQ 在概念上和实践上都是一个强大的工具箱。不应尝试“弯曲”库原语——这些原语本身可以被理解为更复杂的消息传递和信令目的的构建块,而不是现实生活中的解决方案——以便做那些最初的S可扩展F正规C通信P样式设计原型中并未主要涵盖。

    PUB/SUB 原型只是 PUB/SUB

    PUB-向所有在场的SUBs(如果有的话)广播,并且每个人(如果有的话)SUB会收听已经发生的事情到达(如果有任何东西到达,它会应用过滤器......是的,在SUB-side 加载网络上过滤)。


    仍然 - 人们可以设计一种多原型方法来解决这个问题。

    让我为最简单的方案画一个基本方案。

    假设您完全控制了双方(设计方面和实施方面)。

    # /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
    #
    # [Side A]
    #        |_____aKnockSOCK = context.socket( zmq.PAIR )
    #        |     + .setsockopt( zmq.CONFLATE )
    #        |     + .bind()
    #        |
    #        |_____anEmitSOCK = context.socket( zmq.PUB )
    #              + .bind()
    #
    isNotReceivedSigEXIT  = False
    while( isNotReceviedSigEXIT ):
           if ( 0 == aKnockSOCK.poll( aFastPollIN, zmq.POLLIN ) ):
               # nobody new knocking to setup ...
               # ------------------------------------------------
               # do the main job,
               #    with countdown segmentation
               #    to escape to the outer loop
               #    so as to check for new SUB-s
               #    knocking as they come to the show
               # ------------------------------------------------
           else:
               aKnockSOCK.send( "aKnockSOCK ACK: service is ready. Go .connect()" )
    #
    # --------------------------------------------------------
    # ZeroMQ resources graceful termination 
    #        socket closes + context dismantle & clean exit
    
    
    
    # /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
    #
    #[Side B]
    #        |_____aKnockSOCK = context.socket( zmq.PAIR )
    #        |     + .setsockopt( zmq.CONFLATE )
    #        |
    #        |_____aRecvrSOCK = context.socket( zmq.SUB )
    #
    #
    isToBreakEXIT = False
    for nthAttempt in range( 10 ):
        if ( isToBreakEXIT ):
             break
        try:
              aKnockSOCK.connect( ... )
        except:
              ReportConnectERROR( ... )
              sleep( ... )
              continue
        # ---------------------------------- once local aKnockSOCK got instantiated
        for kthPoll in range( 10 ):
            if ( 0 == aKnockSOCK.poll( aLongPollIN, zmq.POLLIN ) ):
               sleep( thisCouldBeAddedToLongPollIN )
            else:
               # ----------------------------------------- .recv() + dismantle
               aKnockSOCK.recv()
               aKnockSOCK.setsockopt( zmq.LINGER, 0 )
               aKnockSOCK.close()
               # ----------------------------------------- .connect() + use
               aRecvrSOCK.connect( ... )
               aRecvrSOCK.setsockopt( zmq.SUBSCRIBE, ... )
               # ----------------------------------------- [Side B] main job start
               # ----------------------------------------- [Side B] main job end
               isToBreakEXIT = True
               break
        pass
    # --------------------------------------------------------
    # ZeroMQ resources graceful termination 
    #        socket closes + context dismantle & clean exit
    

    【讨论】:

      猜你喜欢
      • 2020-09-26
      • 1970-01-01
      • 2011-08-11
      • 1970-01-01
      • 1970-01-01
      • 2018-11-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多