【问题标题】:ZMQ C++ Event Loop Within Class类内的 ZMQ C++ 事件循环
【发布时间】:2013-11-28 02:27:51
【问题描述】:

我使用 ZMQ 的总体目标是避免陷入异步消息传递的困境; ZMQ 似乎是一个可移植且实用的解决方案。然而,大多数 ZeroMQ 文档 like this 以及我在 Google 上搜索过的许多其他 zmq 示例都是基于 helloworld.c 格式的。也就是说,它们都是int main(){}内部的简单程序代码。

我的问题是我想在类 c++ 单例类中“嵌入”一个 zmq“监听器”。我想“收听”消息然后处理它们。我正计划使用 zmq 的 PUSH -> PULL 套接字,以防万一。我不知道该怎么做是在内部“事件循环”中。

class foomgr {
    public:
        static foomgr& get_foomgr();
    // ...
    private:
        foomgr();
        foomgr(const &foomgr);
    // ...
        listener_() {
            // EVENT LOOP HERE
            // RECV and PROCESS ZMQ MSGS
            // while(true) DOES NOT WORK HERE
        }
    // ...
        zmq::context_t zmqcntx_;
        zmq::socket_t zmqsock_;
        const int zmqsock_linger_ = 1000;
    // ....
}

我显然不能在侦听器中使用while(true) 构造,因为无论我从哪里调用它都会阻塞。由于使用 ZMQ 的优点之一是我不必自己管理“侦听器”线程,因此必须弄清楚如何创建自己的线程来包装 listener_ 似乎很愚蠢。我迷失了解决方案。

注意:我是一个 c++ 新手,所以对大多数人来说可能很明显的事情对我来说不是。此外,我正在尝试使用通用“单词”,而不是库或特定语言以避免混淆。代码是用 -std=c++11 构建的,所以那些 构造很好。

【问题讨论】:

  • 您希望在哪个线程中调用这些事件回调?

标签: c++ event-handling zeromq


【解决方案1】:

ZMQ C++ 库没有实现消息轮询的侦听器模式。它将这项任务留给你来包装你自己的类。不过,它确实支持轮询新消息的非阻塞模式。

因此,您可以使用正确的代码以非阻塞方式将其封装在一个小循环中。

查看这个用 C++ 编写的Polling Example here on GitHub。请注意,它从 2 个套接字轮询,因此您需要稍微修改它以删除额外的代码。

您需要在自己的观察者实现中封装的重要部分如下:

zmq::message_t message;
zmq::poll (&items [0], 2, -1);

if (items [0].revents & ZMQ_POLLIN) {
    receiver.recv(&message);
    //  Process task
}

【讨论】:

  • 该问题询问的是 listener 成员函数,而不是 zmq 函数。基本上,我在问是否有任何方法可以向 ZMQ “注册侦听器”,以便 ZMQ 执行 while 循环而不是我的功能。
  • 我还提到过简单的“int main(){...}”示例没有帮助;然而,您在没有解决其与此用例的相关性的情况下准确地链接到了它。虽然我很欣赏这个建议,但它并没有解决问题。
【解决方案2】:

Zmq 在设计上不是线程安全的(截至目前的版本)。事实上,Zmq 强调:

不要使用或关闭套接字,除非在创建它们的线程中。 期间。

不应使用回调,因为调用回调的线程肯定与创建套接字的线程不同,这是被禁止的。

也许,您会发现有用的zmqHelper,一个小型库(只有两个类和几个函数),可以更轻松地在 C++ 中使用 Zmq 并强制(保证)线程不能共享套接字。

在示例部分中,您将了解如何执行最常见的任务。

希望对你有帮助。

代码 sn-p:在 ROUTER-DEALER 代理中使用 zmqHelper 进行轮询。

zmq::context_t theContext {1}; // 1 thread in the socket 
SocketAdaptor< ZMQ_ROUTER > frontend_ROUTER {theContext};
SocketAdaptor< ZMQ_DEALER > backend_DEALER {theContext};

frontend_ROUTER.bind ("tcp://*:8000");
backend_DEALER.bind ("tcp://*:8001");

while (true) {

  std::vector<std::string> lines;

  // 
  //  wait (blocking poll) for data in any socket
  // 
  std::vector< zmqHelper::ZmqSocketType * > list
    = {  frontend_ROUTER.getZmqSocket(),  backend_DEALER.getZmqSocket() };

  zmqHelper::ZmqSocketType *  from = zmqHelper::waitForDataInSockets ( list );

  // 
  //  there is data, where is it from?
  // 
  if ( from ==  frontend_ROUTER.getZmqSocket() ) {
    // from frontend, read ...
    frontend_ROUTER.receiveText (lines);

    // ... and resend
    backend_DEALER.sendText( lines );
  }
  else if ( from ==  backend_DEALER.getZmqSocket() ) {
    // from backend, read ...
    backend_DEALER.receiveText (lines);

    // ... and resend
    frontend_ROUTER.sendText( lines );
  } 
  else if ( from == nullptr ) {
    std::cerr << "Error in poll ?\n";
  }

} // while (true)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-09-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-06
    • 2018-01-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多