【问题标题】:zeromq: reset REQ/REP socket statezeromq:重置 REQ/REP 套接字状态
【发布时间】:2015-01-10 23:31:18
【问题描述】:

当您使用简单的 ZeroMQ REQ/REP 模式时,您依赖于固定的 send()->recv() / recv()->send() 序列。 正如this 文章所描述的,当参与者在请求中间断开连接时,您会遇到麻烦,因为这样您就不能从另一个连接接收下一个请求重新开始,但状态机会强制您发送请求到断开一个。

自从写了上述文章以来,是否出现了一种更优雅的方法来解决这个问题?

重新连接是解决这个问题的唯一方法(除了不使用 REQ/REP 而是使用另一种模式)

【问题讨论】:

  • 您可能希望切换到guide 中描述的基于 ROUTER/DEALER 的可靠请求响应 (AKA pirate) 模式之一

标签: c++ sockets zeromq


【解决方案1】:

由于接受的答案对我来说似乎非常难过,我做了一些研究,发现我们需要的一切实际上都在文档中。

具有正确参数的 .setsockopt() 可以帮助您重置套接字状态机,而不会残酷地破坏它并在前一个尸体之上重建另一个。

(是的,我喜欢这张图片)。

ZMQ_REQ_CORRELATE: 将回复与请求匹配
REQ 套接字的默认行为是依靠消息的顺序来匹配请求和响应,这通常就足够了。当此选项设置为 1 时,REQ 套接字将在传出消息前加上一个包含请求 ID 的额外帧。这意味着完整的消息是(request id0user frames…)。 REQ 套接字将丢弃所有不以这两个帧开头的传入消息。
选项值类型int
选项值单位0,1
默认值0
适用的套接字类型ZMQ_REQ

ZMQ_REQ_RELAXED:放宽请求和回复之间的严格交替
默认情况下,REQ 套接字不允许使用zmq_send(3) 发起新请求,直到收到对前一个请求的回复。当设置为 1 时,允许发送另一条消息,并且会断开与预期回复的对等方的底层连接,从而触发支持它的传输重新连接尝试。请求-回复状态机被重置,新的请求被发送到下一个可用的对等点。
如果设置为 1,还需要启用 ZMQ_REQ_CORRELATE 以确保正确匹配请求和回复。否则,对中止请求的延迟回复可能会被报告为对取代请求的回复。
选项值类型int
选项值单位0,1
默认值0
适用的套接字类型ZMQ_REQ

A complete documentation is here

【讨论】:

    【解决方案2】:

    更新:我在这里用这个优秀的资源更新这个答案:https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ 套接字编程很复杂,所以请查看这篇文章中的参考资料。

    这里的答案似乎都不准确或有用。 OP 不是在寻找有关 BSD 套接字编程的信息。他正试图弄清楚如何在 REP 套接字上的 ZMQ 中稳健地处理 accept()ed 客户端套接字故障,以防止服务器挂起或崩溃。

    如前所述——这个问题很复杂,因为 ZMQ 试图假装服务器监听()套接字与接受()套接字相同(并且文档中没有描述如何在此类套接字上设置基本超时。)

    我的回答:

    在对代码进行大量挖掘之后,传递给 accept()ed socks 的唯一相关套接字选项似乎是来自父级 listener()er 的保持活动选项。所以解决方法是在调用send或recv之前在listen socket上设置如下选项:

    void zmq_setup(zmq::context_t** context, zmq::socket_t** socket, const char* endpoint)
    {
        // Free old references.
        if(*socket != NULL)
        {
            (**socket).close();
            (**socket).~socket_t();
        }
        if(*context != NULL)
        {
            // Shutdown all previous server client-sockets.
            zmq_ctx_destroy((*context));
            (**context).~context_t();
        }
    
        *context = new zmq::context_t(1);
        *socket = new zmq::socket_t(**context, ZMQ_REP);
    
        // Enable TCP keep alive.
        int is_tcp_keep_alive = 1;
        (**socket).setsockopt(ZMQ_TCP_KEEPALIVE, &is_tcp_keep_alive, sizeof(is_tcp_keep_alive));
    
        // Only send 2 probes to check if client is still alive.
        int tcp_probe_no = 2;
        (**socket).setsockopt(ZMQ_TCP_KEEPALIVE_CNT, &tcp_probe_no, sizeof(tcp_probe_no));
    
        // How long does a con need to be "idle" for in seconds.
        int tcp_idle_timeout = 1;
        (**socket).setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, &tcp_idle_timeout, sizeof(tcp_idle_timeout));
    
        // Time in seconds between individual keep alive probes.
        int tcp_probe_interval = 1;
        (**socket).setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, &tcp_probe_interval, sizeof(tcp_probe_interval));
    
        // Discard pending messages in buf on close.
        int is_linger = 0;
        (**socket).setsockopt(ZMQ_LINGER, &is_linger, sizeof(is_linger));
    
        // TCP user timeout on unacknowledged send buffer
        int is_user_timeout = 2;
        (**socket).setsockopt(ZMQ_TCP_MAXRT, &is_user_timeout, sizeof(is_user_timeout));
    
        // Start internal enclave event server.
        printf("Host: Starting enclave event server\n");
        (**socket).bind(endpoint);
    }
    

    这样做是告诉操作系统积极检查客户端套接字是否超时,并在客户端没有及时返回心跳时获取它们以进行清理。结果是操作系统会将 SIGPIPE 发送回您的程序,并且套接字错误将冒泡发送/接收 - 修复挂起的服务器。然后你还需要做两件事:

    1.处理 SIGPIPE 错误,使程序不会崩溃

    #include <signal.h>
    #include <zmq.hpp>
    
    // zmq_setup def here [...]
    
    int main(int argc, char** argv) 
    {
        // Ignore SIGPIPE signals.
        signal(SIGPIPE, SIG_IGN);
        // ... rest of your code after
    
        // (Could potentially also restart the server
        // sock on N SIGPIPEs if you're paranoid.)
    
        // Start server socket.
        const char* endpoint = "tcp://127.0.0.1:47357";
        zmq::context_t* context;
        zmq::socket_t* socket;
        zmq_setup(&context, &socket, endpoint);
    
        // Message buffers.
        zmq::message_t request;
        zmq::message_t reply;
    
        // ... rest of your socket code here
    }
    

    2。检查 send 或 recv 返回的 -1 并捕获 ZMQ 错误。

    // E.g. skip broken accepted sockets (pseudo-code.)
    while (1):
    {
        try
        {
            if ((*socket).recv(&request)) == -1)
                throw -1;
        }
        catch (...)
        {
            // Prevent any endless error loops killing CPU.
            sleep(1)
    
            // Reset ZMQ state machine.
            try
            {
                zmq::message_t blank_reply = zmq::message_t();
                (*socket).send (blank_reply);
            }
            catch (...)
            {
                1;
            } 
    
            continue;
        }
    

    注意到尝试在套接字失败时发送回复的奇怪代码吗?在 ZMQ 中,REP 服务器“套接字”是另一个程序的端点,该程序向该服务器创建 REQ 套接字。结果是,如果您在客户端挂起的 REP 套接字上执行 recv,服务器 sock 会陷入中断的接收循环,它将永远等待接收有效回复。

    要强制更新状态机,请尝试发送回复。 ZMQ 检测到套接字已损坏,并将其从队列中删除。服务器套接字变得“不卡住”,下一个 recv 调用从队列中返回一个新的客户端。

    要在异步客户端上启用超时(在 Python 3 中),代码如下所示:

    import asyncio
    import zmq
    import zmq.asyncio
    
    @asyncio.coroutine
    def req(endpoint):
        ms = 2000 # In milliseconds.
        sock = ctx.socket(zmq.REQ)
        sock.setsockopt(zmq.SNDTIMEO, ms)
        sock.setsockopt(zmq.RCVTIMEO, ms)
        sock.setsockopt(zmq.LINGER, ms) # Discard pending buffered socket messages on close().
        sock.setsockopt(zmq.CONNECT_TIMEOUT, ms)
    
        # Connect the socket.
        # Connections don't strictly happen here.
        # ZMQ waits until the socket is used (which is confusing, I know.)
        sock.connect(endpoint)
    
        # Send some bytes.
        yield from sock.send(b"some bytes")
    
        # Recv bytes and convert to unicode.
        msg = yield from sock.recv()
        msg = msg.decode(u"utf-8")  
    

    现在,当出现问题时,您会遇到一些失败场景。

    顺便说一句——如果有人好奇的话——Linux 中 TCP 空闲超时的默认值似乎是 7200 秒或 2 小时。因此,您将等待一个 很长 时间让挂起的服务器执行任何操作!

    来源:

    免责声明:

    我已经测试了这段代码,它似乎可以工作,但是 ZMQ 确实使测试变得相当复杂,因为客户端在失败时重新连接?如果有人想在生产中使用这个解决方案,我建议先编写一些基本的单元测试。

    服务器代码也可以通过线程或轮询得到很大改进,以便能够同时处理多个客户端。就目前而言,恶意客户端可以暂时从服务器占用资源(3 秒超时),这并不理想。

    【讨论】:

      【解决方案3】:

      好消息是,从 ZMQ 3.0 及更高版本(现代)开始,您可以在套接字上设置超时。正如其他人在其他地方指出的那样,您必须在创建套接字之后但在连接之前执行此操作:

      zmq_req_socket.setsockopt( zmq.RCVTIMEO, 500 ) # milliseconds

      然后,当您实际尝试接收回复时(在您向 REP 套接字发送消息之后),您可以捕获如果超时将被断言的错误:

       try:
         send( message, 0 )
         send_failed = False
      
       except zmq.Again:
         logging.warning( "Image send failed." )
         send_failed = True
      

      但是!当这种情况发生时,正如在其他地方观察到的那样,您的套接字将处于一个有趣的状态,因为它仍然会等待响应。在这一点上,除了重新启动套接字之外,我找不到任何可靠的工作。请注意,如果您断开()套接字然后重新连接()它,它仍然会处于这种不良状态。因此你需要

      def reset_my_socket:
        zmq_req_socket.close()
        zmq_req_socket = zmq_context.socket( zmq.REQ )
        zmq_req_socket.setsockopt( zmq.RCVTIMEO, 500 ) # milliseconds
        zmq_req_socket.connect( zmq_endpoint )
      

      您还会注意到,因为我关闭()了套接字,接收超时选项“丢失”,所以在新套接字上设置它很重要。

      我希望这会有所帮助。我希望这不会成为这个问题的最佳答案。 :)

      【讨论】:

      • 这仍然使 REP 套接字处于错误状态,因为它试图发送一个从未收到的回复。
      【解决方案4】:

      我现在实际上正在研究这个,因为我正在改造旧系统。

      我经常遇到“需要”了解连接状态的代码。但问题是我想转向图书馆提倡的消息传递范式。

      我找到了以下函数:zmq_socket_monitor

      它的作用是监视传递给它的套接字并生成事件,然后将这些事件传递给“inproc”端点——此时您可以添加处理代码来实际执行某些操作。

      这里还有一个例子(实际是测试代码):github

      目前(可能在本周末)我没有任何具体的代码可以提供,但我的目的是响应连接和断开连接,以便我可以实际执行所需的任何逻辑重置。

      希望这会有所帮助,尽管引用了 4.2 文档,但我使用的是 4.0.4,它似乎具有该功能 也是。

      请注意,我注意到您在上面谈到了 python,但问题被标记为 C++,所以这就是我的答案来自...

      【讨论】:

        【解决方案5】:

        对此有一个解决方案,那就是为所有调用添加超时。由于 ZeroMQ 本身并没有真正提供简单的超时功能,我建议使用 ZeroMQ 套接字的子类,它为所有重要调用添加超时参数。

        因此,您可以调用 s.recv(timeout=5.0) 而不是调用 s.recv(),如果在 5 秒窗口内没有返回响应,它将返回 None 并停止阻塞。当我遇到这个问题时,我对此进行了徒劳的尝试。

        【讨论】:

        • 我目前没有 zeromq 4 来用 python 测试它,但我相信你推荐的这种方法可以解决不定式等待的问题,但不能解决状态机的问题。然后您就可以再次拨打recv(),但不能拨打send()。然后你会得到一个像zmq.error.ZMQError: Operation cannot be accomplished in current state 这样的异常。如果我错了请告诉我(无论如何我都会尝试但我必须安装最新版本的pyzmq
        • 我的经历和你描述的frans一样。如果 REQ 套接字发出了 send() 调用,但从未收到响应,那么它会一直等待来自某个地方的响应。所以我可以凭经验证实你的直觉。
        • 正如我上面评论的那样,即使您重置了 REQ 套接字,REP 套接字仍然会卡在错误状态。它卡在回复模式,而不是接收模式。
        猜你喜欢
        • 2016-11-17
        • 2014-01-03
        • 1970-01-01
        • 2020-07-17
        • 2013-06-07
        • 2014-10-11
        • 1970-01-01
        • 2017-02-13
        • 1970-01-01
        相关资源
        最近更新 更多