【问题标题】:boost asio async reading and writing to socket using queue使用队列提升 asio 异步读写到套接字
【发布时间】:2017-05-07 11:26:55
【问题描述】:

我正在开发一个简单的 TCP 服务器,它可以读取并将其消息写入线程安全队列。然后应用程序可以使用这些队列安全地读取和写入套接字,即使是从不同的线程。

我面临的问题是我不能async_read。我的队列有 pop 操作,它返回下一个要处理的元素,但如果没有可用的元素,它会阻塞。因此,一旦我调用 pop ,async_read 回调当然不会再被触发。有没有办法可以将这样的队列集成到 boost asio 中,还是必须完全重写?

以下是我为说明我遇到的问题而制作的一个简短示例。一旦建立了 TCP 连接,我将创建一个新线程,该线程将在该 tcp_connection 下运行应用程序。之后我想开始async_readasync_write。我已经为此烦恼了几个小时,我真的不知道如何解决这个问题。

class tcp_connection : public std::enable_shared_from_this<tcp_connection>
{
public:
    static std::shared_ptr<tcp_connection> create(boost::asio::io_service &io_service) {
        return std::shared_ptr<tcp_connection>(new tcp_connection(io_service));
    }

    boost::asio::ip::tcp::socket& get_socket()
    {
        return this->socket;
    }

    void app_start()
    {
        while(1)
        {
            // Pop is a blocking call.
            auto inbound_message = this->inbound_messages.pop();
            std::cout << "Got message in app thread: " << inbound_message << ". Sending it back to client." << std::endl;
            this->outbound_messages.push(inbound_message);
        }
    }

    void start() {
        this->app_thread = std::thread(&tcp_connection::app_start, shared_from_this());

        boost::asio::async_read_until(this->socket, this->input_stream, "\r\n",
            strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));

        // Start async writing here. The message to send are in the outbound_message queue. But a Pop operation blocks
        // empty() is also available to check whether the queue is empty.
        // So how can I async write without blocking the read.
        // block...
        auto message = this->outbound_messages.pop();
        boost::asio::async_write(this->socket, boost::asio::buffer(message),
            strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
    }

    void handle_read(const boost::system::error_code& e, size_t bytes_read)
    {
        std::cout << "handle_read called" << std::endl;
        if (e)
        {
            std::cout << "Error handle_read: " << e.message() << std::endl;
            return;
        }
        if (bytes_read != 0)
        {
            std::istream istream(&this->input_stream);
            std::string message;
            message.resize(bytes_read);
            istream.read(&message[0], bytes_read);
            std::cout << "Got message: " << message << std::endl;
            this->inbound_messages.push(message);
        }
        boost::asio::async_read_until(this->socket, this->input_stream, "\r\n",
            strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
    }

    void handle_write(const boost::system::error_code& e, size_t /*bytes_transferred*/)
    {
        if (e)
        {
            std::cout << "Error handle_write: " << e.message() << std::endl;
            return;
        }

        // block...
        auto message = this->outbound_messages.pop();
        boost::asio::async_write(this->socket, boost::asio::buffer(message),
            strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
    }



private:
    tcp_connection(boost::asio::io_service& io_service) : socket(io_service), strand(io_service)
    {
    }

    boost::asio::ip::tcp::socket socket;
    boost::asio::strand strand;
    boost::asio::streambuf input_stream;

    std::thread app_thread;

    concurrent_queue<std::string> inbound_messages;
    concurrent_queue<std::string> outbound_messages;
};

class tcp_server
{
public:
    tcp_server(boost::asio::io_service& io_service)
        : acceptor(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 9001))
    {
        start_accept();
    }

private:
    void start_accept()
    {
        std::shared_ptr<tcp_connection> new_connection =
            tcp_connection::create(acceptor.get_io_service());

        acceptor.async_accept(new_connection->get_socket(),
            boost::bind(&tlcp_tcp_server::handle_accept, this, new_connection, boost::asio::placeholders::error));
    }

    void handle_accept(std::shared_ptr<tcp_connection> new_connection,
                       const boost::system::error_code& error)
    {
        if (!error)
        {
            new_connection->start();
        }

        start_accept();
    }

    boost::asio::ip::tcp::acceptor acceptor;
};

【问题讨论】:

    标签: c++ boost boost-asio


    【解决方案1】:

    在我看来,您好像需要一个 async_pop 方法,该方法采用错误消息占位符和回调处理程序。当您收到消息时,检查是否有未完成的处理程序,如果有,弹出消息,取消注册处理程序并调用它。同样,在注册 async_pop 时,如果已经有消息等待,则弹出消息并发布对处理程序的调用而不注册它。

    您可能希望从 pop_operation 或类似类型的多态基基派生 async_pop 类。

    【讨论】:

    • 谢谢!我没有想到我可以自己创建一个这样的处理程序。就像我使用strand.wrap 一样,这应该可以正常工作。我有一个问题是为什么我需要错误消息占位符?
    • @JohnSmith 我的错误。您需要一个消息占位符(std::string& ?),如果消息队列已用尽并且有读取操作,您需要将最后一次不成功的 io 读取操作中的任何错误代码传递回异步处理程序错误。
    • 是的,我现在已经实现了。在代码中它不是一个字符串,但用示例字符串进行演示是最简单的。明天我会在另一个答案中发布代码,但我会让你的代码被接受。仅适用于想知道同样事情的人。
    猜你喜欢
    • 1970-01-01
    • 2011-11-26
    • 2011-12-05
    • 2012-03-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多