【问题标题】:How can I clean up properly when recv is blocking?recv 阻塞时如何正确清理?
【发布时间】:2013-04-19 05:11:30
【问题描述】:

考虑下面的示例代码(作为示例,我快速输入它,如果有错误也没关系 - 我对理论感兴趣)。

bool shutDown = false; //global

int main()
{
  CreateThread(NULL, 0, &MessengerLoop, NULL, 0, NULL);
  //do other programmy stuff...
}


DWORD WINAPI MessengerLoop( LPVOID lpParam )
{
  zmq::context_t context(1);
  zmq::socket_t socket (context, ZMQ_SUB);
  socket.connect("tcp://localhost:5556");
  socket.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);

  while(!shutDown)
  {
    zmq_msg_t getMessage;
    zmq_msg_init(&getMessage);
    zmq_msg_recv (&getMessage, socket, 0); //This line will wait forever for a message
    processMessage(getMessage); 
  }
}

创建一个线程来等待传入的消息并适当地处理它们。线程一直在循环,直到 shutDown 设置为 true。

在 ZeroMQ 中,Guide 明确说明了必须清理的内容,即消息、套接字和上下文。

我的问题是:由于recv 将永远等待消息,阻塞线程,如果从未收到消息,我该如何安全地关闭该线程?

【问题讨论】:

  • zmq_msg_recv中使用ZMQ_DONTWAIT标志,并在短时间内添加手动Sleep?所以使用更多的轮询方法?
  • 我不知道 zmg_* API,但是在普通的套接字 API 中,在另一个线程中关闭套接字就足够了,并且 recv 失败退出。为此,您需要连接到另一个线程中的套接字,以使 socket 变量可用于关闭。 MessengerLoop 必须只包含无限的recv 循环。
  • @AlexFarber 我曾考虑过这一点,但除非我弄错了,否则不会导致所有消息处理都在同一个 MessengerLoop 线程中完成吗?在这种情况下,我必须设置某种信号系统来区分“我正在等待消息,所以是的,你可以终止我”和“我正忙于处理某些事情,不要终止我”。这似乎比基于轮询的系统更混乱。我对这个网络消息问题很陌生,是我遗漏了什么还是这是正常的做事方式?
  • 我不熟悉 zmq_* 函数,但是查看您发布的代码,我认为当套接字关闭时,循环中唯一可能失败的行是 zmq_msg_recv 调用。但这只是我的猜测。一旦这个调用失败,退出循环和整个线程。
  • 另外,从主线程关闭socket后,需要等待线程句柄,当线程真正退出时——通用Win32多线程编程规则。

标签: c++ zeromq


【解决方案1】:

阻塞调用会以几种方式退出。首先,这取决于您的语言和绑定,中断(Ctrl-C、SIGINT、SIGTERM)将退出调用。您将返回(同样,取决于您的绑定)错误或空消息(libzmq 返回 EINTR 错误)。

其次,如果你在另一个线程中终止上下文,阻塞调用也会退出(libzmq 返回 ETERM 错误)。

第三,您可以在套接字上设置超时,以便它在任何情况下都会在超时后返回,如果没有数据。我们不经常这样做,但在某些情况下它会很有用。

最后,我们在实践中所做的绝不是阻塞接收,而是使用 zmq_poll 找出套接字何时有消息等待,然后从这些套接字接收。这就是您向外扩展以处理更多套接字的方式。

【讨论】:

  • 您还会对 REQ - REP 对使用轮询吗?
  • 当然。阻塞读取在仅使用一个套接字的简单任务中很好,但大多数实际任务使用多个套接字。例如,您可以使用 REQ 套接字接收工作,然后每隔几分钟将您的状态记录到 PUB 套接字。
  • 这正是我已经实现的 :) 很高兴知道我在正确的轨道上。谢谢!
【解决方案2】:

您可以使用非阻塞调用标志ZMQ_DONTWAIT

  while(!shutDown)
  {
    zmq_msg_t getMessage;
    zmq_msg_init(&getMessage);
    while(-1 == zmq_msg_recv(&getMessage, socket, ZMQ_DONTWAIT))
    {
      if (EAGAIN != errno || shutDown)
      {
        break;
      }
      Sleep(100);
    }
    processMessage(getMessage); 
  }

【讨论】:

  • 两个小逻辑错误:1) if (EAGAIN == errno || shutdown) 2) add if (shutDown) { break; } 从 recv while 循环中断后,即跳过 processMessage
【解决方案3】:

每当 zmq 上下文被销毁时,zmq_msg_recv 都会收到 -1。我在所有代码中都使用它作为终止条件。

while (!shutdown)
{
    ..
    ..
    int rc = zmq_msg_recv (&getMessage, socket, 0);
    if (rc != -1)
    {
      processMessage;
    }
    else 
      break;
}

请记住在 main() 的末尾销毁 zmq 上下文以进行适当的清理。

zmq_ctx_destroy(zctx); 

【讨论】:

    【解决方案4】:

    假设您有一个类 SUB(订阅者)来管理您的 ZMQ 消息的接收。在主函数/类的析构函数或退出函数中,调用以下代码:

    pub->close();
    
    ///
    /// Close the publish context
    ///
    void PUB::close()
    {
        zmq_close (socket);
        zmq_ctx_destroy (context);
    }
    

    这将使“recv”阻塞终止并出现您可以忽略的错误消息。应用程序将以正确的方式轻松退出。这是正确的方法。祝你好运!

    【讨论】:

    • 我从wiki.zeromq.org/whitepapers:0mq-termination 读到的是,当同一个套接字在recv 上等待时,从另一个线程调用zmq_close 无效。 “每个线程都必须对自己的套接字的打开和关闭负责”
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-16
    • 2015-06-18
    • 1970-01-01
    相关资源
    最近更新 更多