【问题标题】:How to serialise classes for UDP discover and TCP connection using boost::asio如何使用 boost::asio 为 UDP 发现和 TCP 连接序列化类
【发布时间】:2019-07-27 11:47:32
【问题描述】:

我正在尝试学习如何使用 boost::asio。 C++ 和 ASYNC 与我的正常编程实践非常不同。

我正在尝试编写一个程序,该程序使用 UDP 发现设备,然后建立到它的 TCP 连接。一旦建立 TCP 连接,程序就会停止 UDP 搜索停止。如果 TCP 连接断开或超时,UDP 搜索将重新开始。

我今天看了很多视频,包括https://www.youtube.com/watch?v=7FQwAjELMek。我的代码松散地基于所讨论的共享指针习语,因为这似乎是我得到的最接近解决方案的方法。

我开发了两个类。

  • udpFindQSYNC,我可以使用它来搜索使用 UDP 的设备/

  • tcpQSYNC,我可以使用它来使用 TCP 建立与设备的连接。

为了测试我的程序 - 我启动它,然后我使用 netcat 伪造一个 UDP 响应,然后使用不存在的 IP 地址让 TCP 连接超时,以尝试让程序循环回搜索。

回声“你好” | nc -lu 0.0.0.0 9720

#include <memory>
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

class tcpQSYNC : public std::enable_shared_from_this<tcpQSYNC> {

public:
    tcpQSYNC(boost::asio::io_context &ioc, std::string hostname, unsigned int tcpPort) :
            m_socket(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
            m_timer(ioc)
    {
        boost::asio::ip::tcp::resolver resolver(ioc);
        boost::asio::ip::tcp::resolver::query query(hostname, std::to_string(tcpPort));
        boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
        m_remoteEndpoint = *endpoint_iterator;
    }

    ~tcpQSYNC() {
        std::cout << "tcpQSYNC destructor" << std::endl;
    }

    void run () {
        startConnection ();
        std::cout << "TCP Connection Started" << std::endl;
    }

    void startConnection() {
        m_socket.async_connect(m_remoteEndpoint,
                               [self = shared_from_this()](boost::system::error_code errorCode) {
                                   self->onConnectHandler(errorCode);
                               });
    }

    void onConnectHandler(const boost::system::error_code& error) {return;}
private:
    boost::asio::ip::tcp::socket m_socket;
    boost::asio::ip::tcp::endpoint m_remoteEndpoint;
    boost::asio::deadline_timer m_timer;
};

class udpFindQSYNC : public std::enable_shared_from_this<udpFindQSYNC> {
public:
    udpFindQSYNC(boost::asio::io_context &ioc, unsigned int udpPort) :
            m_socket(ioc, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)),
            m_localEndpoint(boost::asio::ip::address_v4::broadcast(), udpPort),
            m_timer(ioc) {

        m_socket.set_option(boost::asio::ip::udp::socket::reuse_address(true));
        m_socket.set_option(boost::asio::socket_base::broadcast(true));
    }

    ~udpFindQSYNC() {
        std::cout << "udpFindQSYNC() destructor" << std::endl;
    }

    void run() {
        sendUDPBroadcast();
    }

    void sendUDPBroadcast() {
        std::array<uint8_t, 2> data = {{0, 0}};

        m_socket.async_send_to(boost::asio::buffer(data, 2), m_localEndpoint,
                               [self = shared_from_this()](boost::system::error_code errorCode, std::size_t bytes) {
                                   self->onBroadcastComplete(errorCode, bytes);
                               });
    }

    void onBroadcastComplete(const boost::system::error_code &errorCode, std::size_t bytes_transferred) {
        if (errorCode == boost::system::errc::success) {
            std::cout << "UDP Broadcast "<< bytes_transferred << " byte"<< ((bytes_transferred==1) ? "" : "s") << std::endl;
            queueRead();
            createTimer();
        } else {
            std::cout << __func__ << " (" << errorCode.message() << ")" << std::endl;
        }
    }

    void createTimer() {
        // 10 second retry timer
        m_timer.expires_from_now(boost::posix_time::milliseconds(10000));
        m_timer.async_wait(
                [self = shared_from_this()] (boost::system::error_code errorCode)
                {
                    self->onTimerExpiry(errorCode);
                });
    }

    void queueRead() {
        m_socket.async_receive_from(boost::asio::buffer(m_buffer), m_remoteEndpoint,
                                    [self = shared_from_this()](boost::system::error_code errorCode, std::size_t bytes) {
                                        self->onReceiveData(errorCode, bytes);
                                    });
    }

    void onTimerExpiry(const boost::system::error_code &errorCode) {
        if (errorCode == boost::system::errc::success) {
            std::cout << "UDP Timer Expired" << std::endl;
            // Timer has expired.   Cancel outstanding read operation and start again
            m_socket.cancel();
            sendUDPBroadcast();
        } else if (errorCode == boost::system::errc::operation_canceled){
            std::cout << "Timer Operation Cancelled " << std::endl;
        }
    }

    void onReceiveData(const boost::system::error_code &errorCode, std::size_t bytes_transferred) {
        // Read has completed. Cancel the timer.
        m_timer.cancel();
        if (errorCode == boost::system::errc::success) {
            std::cout << "UDP Received Data " << bytes_transferred << " byte" <<((bytes_transferred==1) ? " " : "s ") << getIPAddress() << std::endl;
        } else if (errorCode == boost::system::errc::operation_canceled) {
            std::cout << "UDP Read Operation Cancelled " << std::endl;
        }
    }

    std::string getIPAddress() {
        std::cout << "Called getIPAddress() " << m_remoteEndpoint.address().to_string() << std::endl;
        return m_remoteEndpoint.address().to_string();
    }
private:
    boost::asio::ip::udp::socket m_socket;
    boost::asio::ip::udp::endpoint m_localEndpoint;
    boost::asio::ip::udp::endpoint m_remoteEndpoint;
    boost::asio::deadline_timer m_timer;

    std::array<uint8_t, 32> m_buffer = {0};
};

int main() {
    boost::asio::io_context ioc;
    boost::asio::io_context::strand strand(ioc);

    int loop =0;
    while (loop < 2) {
        auto udp = std::make_shared<udpFindQSYNC>(ioc, 9720);
        udp->run();
        std::string remote = udp->getIPAddress();  // Should return 192.168.0.140 in my case.
        std::cout << "Main " << remote << std::endl;

        // I want to get the address returned from the udpFindQSYNC.
        // I have hard code to no existant IP to cause timeout

        std::string nonextisthostname("192.168.0.143");
        std::make_shared<tcpQSYNC>(ioc, nonextisthostname, 9760)->run();

        loop++;
        // Run the I/O service on the main thread
        ioc.run();

我无法理解的事情

  1. 我如何从 udpFindQSYNC 类返回 IP 地址,以便 tcpQSYNC 类也进行连接。由于 udpFindQSYNC 析构函数已被调用。

  2. 我如何使用 io_context 在本质上是无限循环中依次运行两个单独的类。

我看了很多,但不知道如何在我的上下文中使用。我总是看到 TCP 连接同时运行到 UDP

我的程序产生的日志是:

UDP Broadcast 2 bytes
tcpQSYNC destructor
UDP Timer Expired
UDP Read Operation Cancelled 
UDP Broadcast 2 bytes
UDP Received Data 6 bytes Called getIPAddress() 192.168.0.140
192.168.0.140
Timer Operation Cancelled 
udpFindQSYNC() destructor      <- My class is detroyed
Called getIPAddress() 0.0.0.0   
Main 0.0.0.0                   <- Thus my result is wrong   
TCP Connection Started
tcpQSYNC destructor
udpFindQSYNC() destructor

有人能指出解决我无法弄清楚的两个问题的最佳方法吗?

【问题讨论】:

    标签: c++ boost boost-asio


    【解决方案1】:

    方法getIPAddress 可以在调用onReceiveData 处理程序之后或在此方法内调用。

    你的问题是getIPAddress被调用的太早了,m_remoteEndpoint还没有被填充,因为udp-&gt;run()立即返回并且handler - onReceiveData没有被调用。

    您的问题的可能解决方案:

    1) 在getIPAddress 中添加一些阻塞机制,直到调用onReceiveData,然后getIPAddress 可以结束并返回m_remoteEndpoint 地址

    2) 您从onReceiveData 拨打getIPAddress

    例如可以通过使用condition_variableisAddress 标志来实现第一种方式。在getIPAddress 内部,您正在使用带有谓词的条件变量调用wait,该谓词检查isAddress 是否设置为true。在处理程序onReceiveData 中,您将isAddress 设置为true,并通知条件变量。这种方法的缺点是,在main 中,您需要启动ioc.run() 工作的额外线程(在后台) - 以处理处理程序。如果没有这个main 线程将在getIPAddress 方法上被阻塞。

    第二种方式,主循环可以简化为:

    int loop =0;
    while (loop < 2) 
    {
        auto udp = std::make_shared<udpFindQSYNC>(ioc, 9720);
        udp->run();
    
        loop++;
        // Run the I/O service on the main thread
        ioc.run();
    }
    

    我认为这就是你想要的。您在 udp-&gt;run 中启动第一个异步操作,其余工作在处理程序中执行。

    tcpQSYNC 何时创建?在onReceiveData,因为这样你就知道要连接的另一端的地址。

    void onReceiveData(const boost::system::error_code &errorCode, std::size_t bytes_transferred) 
    {
        m_timer.cancel();
        if (errorCode == boost::system::errc::success) {
            std::cout << "UDP Received Data " << bytes_transferred << " byte" <<((bytes_transferred==1) ? " " : "s ") << getIPAddress() << std::endl;
         // m_remoteEndpoint is filled here
        std::make_shared<tcpQSYNC>(ioc, m_remoteEndpoint.address().to_string(), 9760)->run();
    
        } else if (errorCode == boost::system::errc::operation_canceled) {
            std::cout << "UDP Read Operation Cancelled " << std::endl;
        }
    }
    

    这个

    std::array<uint8_t, 2> data = {{0, 0}};
    
    m_socket.async_send_to(boost::asio::buffer(data, 2), m_localEndpoint,
              [self = shared_from_this()](boost::system::error_code errorCode, std::size_t bytes) {
                    self->onBroadcastComplete(errorCode, bytes);
              });
    

    是未定义的行为。 data 是本地的。 async_send_to 立即返回。 boost::asio::buffer 不会复制传递的缓冲区。

    您可以将data 存储为类的数据成员,以确保缓冲区在执行async_send_to 时一直存在。或者将其放入shared_ptr 并按值将智能指针传递给 lambda - 数据的生命周期将延长。


    tcpQSYNC 中,为什么要将端点传递给m_socket

    m_socket(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0))
    

    此构造函数会将套接字绑定到给定的端点。做什么的?您是客户,而不是服务器。 你应该只传递协议:

    m_socket(ioc,boost::asio::ip::tcp::v4()),
    

    【讨论】:

      猜你喜欢
      • 2011-10-07
      • 2018-06-08
      • 2012-05-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-05-29
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多