【问题标题】:How to Log Receive Message in ZMQ Proxy?如何在 ZMQ 代理中记录接收消息?
【发布时间】:2019-10-02 09:46:15
【问题描述】:

在 ZMQ Proxy 中,我们有 2 种类型的套接字,DEALER 和 ROUTER。另外,我尝试使用捕获套接字,但根据我所寻找的内容,它没有工作。

我正在寻找一种方法来记录我的代理服务器收到的消息。

【问题讨论】:

    标签: c++ proxy zeromq


    【解决方案1】:

    Q一种记录我的代理服务器接收到的消息的方法。

    最简单的方法是通过 ManInTheMiddle-“捕获”套接字使用 API v4+ 直接支持的日志记录:

    // [ROUTER]--------------------------------------+++++++
    //                                               |||||||
    // [DEALER]---------------*vvvvvvvv             *vvvvvvv
    int zmq_proxy (const void *frontend, const void *backend, const void *capture);
    // [?]---------------------------------------------------------------*^^^^^^^
    

    capture 应该是 { ZMQ_PUB | ZMQ_DEALER | ZMQ_PUSH | ZMQ_PAIR }

    如果 capture 套接字不是 NULL,则代理将发送所有在 frontendbackend,连接到 capture 套接字。

    如果这个 ZeroMQ API 授权不符合您的期望,请随时根据需要以足够详细的方式表达您的期望(并实现“外部”capture-socket 有效负载 { message-content | socket_monitor()基于}的过滤或者可以设计一个全新的、用户定义的日志代理,您表达的功能将通过使用您的自定义用例特定要求来实现,在您的应用程序特定代码中实现,诉诸重新对所有 DEALER-inbound/outbound-ROUTER 消息传递和日志过滤/处理逻辑使用干净且简单的 ZeroMQ API。)

    我想不出其他方式来解决这个任务。

    【讨论】:

    • 是的,正好通过 2 个额外的套接字捕获它解决了! pushpull 套接字。非常感谢:)
    • 很高兴听到这个消息,先生。
    【解决方案2】:

    它也适用于一对 PAIR 插座。一旦一对套接字的一端连接到捕获套接字,消息就会发送到捕获套接字并发送到代理的另一端。

    http://zguide.zeromq.org/page:all#ZeroMQ-s-Built-In-Proxy-Functionhttp://api.zeromq.org/3-2:zmq-proxyhttp://zguide.zeromq.org/page:all#Pub-Sub-Tracing-Espresso-Pattern

    帮了我。

    python中的这段代码演示了它:

        import zmq, threading, time
        
        def peer_run(ctx):
            """ this is the run method of the PAIR thread that logs the messages
            going through the broker """
            sock = ctx.socket(zmq.PAIR)
            sock.connect("inproc://peer") # connect to the caller
            sock.send(b"") # signal the caller that we are ready
            while True:
                try:
                    topic = sock.recv_string()
                    obj = sock.recv_pyobj()
                except Exception:
                    topic = None
                    obj = sock.recv()
                print(f"\n !!! peer_run captured message with topic {topic}, obj {obj}. !!!\n")
        
        def proxyrun():
            """ zmq broker run method in separate thread because zmq.proxy blocks """ 
            xpub = ctx.socket(zmq.XPUB)
            xpub.bind(xpub_url)
            xsub = ctx.socket(zmq.XSUB)
            xsub.bind(xsub_url)
            zmq.proxy(xpub, xsub, cap)
        
        def pubrun():
            """ publisher run method in a separate thread, publishes 5 messages with topic 'Hello'"""
            socket = ctx.socket(zmq.PUB)
            socket.connect(xsub_url)
            for i in range(5):
                socket.send_string(f"Hello {i}", zmq.SNDMORE)
                socket.send_pyobj({'a' : 123})
                time.sleep(0.01)
        
        ctx = zmq.Context()
        xpub_url = "ipc://xpub"
        xsub_url = "ipc://xsub"
        #xpub_url = "tcp://127.0.0.1:5567"
        #xsub_url = "tcp://127.0.0.1:5568"
        # set up the capture socket pair
        cap = ctx.socket(zmq.PAIR)
        cap.bind("inproc://peer")
        cap_th = threading.Thread(target=peer_run, args=(ctx,), daemon=True)
        cap_th.start()
        cap.recv() # wait for signal from peer thread
        print("cap received message from peer, proceeding.")
        # start the proxy
        th_proxy=threading.Thread(target=proxyrun, daemon=True)
        th_proxy.start()
        # create req/rep socket just to prove that pub/sub can run alongside it
        zmq_rep_sock = ctx.socket(zmq.REP)
        zmq_rep_sock.bind("ipc://ghi")
        # create sub socket and connect it to proxy's pub socket
        zmq_sub_sock = ctx.socket(zmq.SUB)
        zmq_sub_sock.connect(xpub_url)
        zmq_sub_sock.setsockopt(zmq.SUBSCRIBE, b"Hello")
        # create the poller
        poller = zmq.Poller()
        poller.register(zmq_rep_sock, zmq.POLLIN)
        poller.register(zmq_sub_sock, zmq.POLLIN)         
        # create publisher thread and start it
        th_pub = threading.Thread(target=pubrun, daemon=True)
        th_pub.start()
        # receive publisher's messages ordinarily
        while True:
            events = dict(poller.poll())
            print(f"received events: {events}")
            if zmq_rep_sock in events:
                message = zmq_rep_sock.recv_pyobj()
                print(f"received zmq_rep_sock {message}")
            elif zmq_sub_sock in events:
                topic = zmq_sub_sock.recv_string()
                message = zmq_sub_sock.recv_pyobj()
                print(f"received zmq_sub_sock {topic} , {message}")
    

    输出

    cap received message from peer, proceeding.
    
     !!! peer_run captured message with topic None, obj b'\x80\x03}q\x00X\x01\x00\x00\x00aq\x01K{s.'. !!!
    
    received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
    received zmq_sub_sock Hello 1 , {'a': 123}
    
     !!! peer_run captured message with topic Hello 2, obj {'a': 123}. !!!
    
    received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
    received zmq_sub_sock Hello 2 , {'a': 123}
    
     !!! peer_run captured message with topic Hello 3, obj {'a': 123}. !!!
    
    received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
    received zmq_sub_sock Hello 3 , {'a': 123}
    
     !!! peer_run captured message with topic Hello 4, obj {'a': 123}. !!!
    
    received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
    received zmq_sub_sock Hello 4 , {'a': 123}
    

    注意慢加入问题,因此发布者中的睡眠命令/

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-12-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多