【问题标题】:Why io_context sopped in my boost asio coroutine server为什么 io_context 出现在我的 boost asio 协程服务器中
【发布时间】:2020-03-12 23:10:07
【问题描述】:

我的服务器基于 boost spawn echo 服务器示例,并在 this thread 中进行了改进。真正的服务器很复杂,我做了一个更简单的服务器来说明问题:

服务器监听 12345 端口,从新连接接收 0x4000 字节数据。

客户端运行1000个线程,连接服务器,发送0x4000字节数据。

问题:当客户端运行时,1秒后通过控制台中的Ctrl-C杀死客户端进程,然后服务器的io_context将停止,服务器运行到无限循环并消耗 100% cpu。如果这没有发生,重复启动客户端并杀死它几次,它就会发生。可能几次后它会耗尽 TCP 端口,等几分钟再试一次,它会在我的机器上杀死客户端 3~15 次后发生。

boost document 表示io_context.stopped() 用于判断是否停止

要么通过显式调用 stop(),要么由于工作用完

我从不调用io_context.stop(),并使用make_work_guard(io_context) 保持io_context 不停止,但为什么它仍然停止?

我的环境:Win10-64bit, boost 1.71.0

服务器代码:

#include <iostream>
using namespace std;

#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
namespace ba=boost::asio;

#define SERVER_PORT 12345
#define DATA_LEN 0x4000


struct session : public std::enable_shared_from_this<session>
{
    tcp::socket socket_;
    boost::asio::steady_timer timer_;
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;

    explicit session(boost::asio::io_context& io_context, tcp::socket socket)
    : socket_(std::move(socket)),
      timer_(io_context),
      strand_(io_context.get_executor())
    { }

    void go()
    {
        auto self(shared_from_this());
        boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
        {
            spawn(yield, [this, self](ba::yield_context yield) {
                timer_.expires_from_now(10s); // 10 second
                while (socket_.is_open()) {
                    boost::system::error_code ec;
                    timer_.async_wait(yield[ec]);
                    // timeout triggered, timer was not canceled
                    if (ba::error::operation_aborted != ec) {
                        socket_.close();
                    }
                }
            });

            try
            {
                // recv data
                string packet;

                // read data
                boost::system::error_code ec;

                ba::async_read(socket_,
                               ba::dynamic_buffer(packet),
                               ba::transfer_exactly(DATA_LEN),
                               yield[ec]);
                if(ec) {
                    throw "read_fail";
                }

            }
            catch (...)
            {
                cout << "exception" << endl;
            }

            timer_.cancel();
            socket_.close();
        });

    }
};
struct my_server {  
    my_server() { }
    ~my_server() { } 

    void start() {
        ba::io_context io_context;
        auto worker = ba::make_work_guard(io_context);

        ba::spawn(io_context, [&](ba::yield_context yield)
        {
            tcp::acceptor acceptor(io_context,
            tcp::endpoint(tcp::v4(), SERVER_PORT));

            for (;;)
            {
                boost::system::error_code ec;

                tcp::socket socket(io_context);
                acceptor.async_accept(socket, yield[ec]);
                if (!ec) {
                    std::make_shared<session>(io_context, std::move(socket))->go();
                } 
            }
        });

        // Run io_context on All CPUs
        auto thread_count = std::thread::hardware_concurrency();
        boost::thread_group tgroup;
        for (auto i = 0; i < thread_count; ++i) 
            tgroup.create_thread([&] {
                for (;;) {
                    try { 
                        if (io_context.stopped()) { // <- this happens after killing Client process several times
                            cout << "io_context STOPPED, now server runs infinit loop with full cpu usage" << endl;
                        }
                        io_context.run(); 
                    }
                    catch(const std::exception& e) { 
                        MessageBox(0, "This never popup", e.what(), 0); 
                    }
                    catch(const boost::exception& e) { 
                        MessageBox(0, "This never popup", boost::diagnostic_information(e).data(), 0); 
                    }
                    catch(...) { MessageBox(0, "This never popup", "", 0); }
                }
            });
        tgroup.join_all();
    }
};  

int main() {
    my_server svr;
    svr.start();
}

客户:

#include <iostream>
#include <random>
#include <thread>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
using namespace std;

using boost::asio::ip::tcp;
namespace ba=boost::asio;

#define SERVER "127.0.0.1"
#define PORT "12345"

int main() {
    boost::asio::io_context io_context;

    static string data_0x4000(0x4000, 'a');

    boost::thread_group tgroup;
    for (auto i = 0; i < 1000; ++i) 
        tgroup.create_thread([&] {
            for(;;) {

                try {
                    tcp::socket s(io_context);
                    tcp::resolver resolver(io_context);
                    boost::asio::connect(s, resolver.resolve(SERVER, PORT));

                    ba::write(s, ba::buffer(data_0x4000));
                } catch (std::exception e) {
                    cout << " exception: " << e.what() << endl;
                } catch (...) {
                    cout << "unknown exception" << endl;
                }
            }
        });

    tgroup.join_all();

    return 0;
}

更新解决方法:

我猜io_context 和协程会出现问题,所以我尝试将不必要的spawn 替换为std::thread,并且它起作用了,io_context 不再停止。但是为什么还是会出现这个问题呢?

替换:

ba::spawn(io_context, [&](ba::yield_context yield)
{
    tcp::acceptor acceptor(io_context,
    tcp::endpoint(tcp::v4(), SERVER_PORT));

    for (;;)
    {
        boost::system::error_code ec;

        tcp::socket socket(io_context);
        acceptor.async_accept(socket, yield[ec]);
        if (!ec) {
            std::make_shared<session>(io_context, std::move(socket))->go();
        } 
    }
});

收件人:

std::thread([&]()
{
    tcp::acceptor acceptor(io_context,
    tcp::endpoint(tcp::v4(), SERVER_PORT));

    for (;;)
    {
        boost::system::error_code ec;

        tcp::socket socket(io_context);
        acceptor.accept(socket, ec);
        if (!ec) {
            std::make_shared<session>(io_context, std::move(socket))->go();
        } 
    }
}).detach();

【问题讨论】:

    标签: c++ boost-asio boost-coroutine


    【解决方案1】:

    即使进行(非常)广泛的压力测试,我也无法在 linux 上重现您的问题。

    除了某些会话按预期到达“EOF”消息之外,即使是硬杀客户端进程也没有显示任何其他影响。

    存在可用端口用完的问题,但这主要是因为您在客户端中重新连接的速度太快了。

    跳出框框思考

    • 会不会是您在没有同步的情况下使用std::cout 和/或MessageBox² 而MSVC 的标准库不能很好地处理它?
    • 会不会是 asio 运行循环引发了 catch 处理程序未正确捕获的异常?我不知道这是否相关,但 MSVC 确实有 SEH(结构化异常)¹
    • 没有必要在那里保持“运行”的紧密循环。如果你真的想继续运行循环,你应该在两者之间调用io_context.restart();。我不建议这样做,因为它会使任何常规关机变得不可能。

    如果您有兴趣,可以对代码进行一些小的调整。它添加了一些处理/制作的会话/连接的可视化。请注意,client 几乎没有变化,但 server 有一些变化可能会激发您的灵感:

    server.cpp

    #include <iostream>
    #include <iomanip>
    
    #include <boost/thread/thread.hpp>
    #include <boost/asio.hpp>
    #include <boost/asio/spawn.hpp>
    
    namespace ba = boost::asio;
    using boost::asio::ip::tcp;
    using namespace std::literals;
    
    #define SERVER_PORT 12345
    #define DATA_LEN 0x4000
    
    void MessageBox(int, std::string const& caption, std::string const& message, ...) {
        std::cerr << caption << ": " << std::quoted(message) << std::endl;
    }
    
    struct session : public std::enable_shared_from_this<session>
    {
        tcp::socket socket_;
        ba::steady_timer timer_;
        ba::strand<ba::io_context::executor_type> strand_;
    
        explicit session(ba::io_context& io_context, tcp::socket socket)
        : socket_(std::move(socket)),
          timer_(io_context),
          strand_(io_context.get_executor())
        { }
    
        void go()
        {
            auto self(shared_from_this());
            ba::spawn(strand_, [this, self](ba::yield_context yield)
            {
                spawn(yield, [this, self](ba::yield_context yield) {
                    while (socket_.is_open()) {
                        timer_.expires_from_now(10s); 
                        boost::system::error_code ec;
                        timer_.async_wait(yield[ec]);
                        // timeout triggered, timer was not canceled
                        if (ba::error::operation_aborted != ec) {
                            socket_.close(ec);
                        }
                    }
                });
    
                try
                {
                    // recv data
                    std::string packet;
    
                    // read data
                    ba::async_read(socket_,
                                   ba::dynamic_buffer(packet),
                                   ba::transfer_exactly(DATA_LEN),
                                   yield);
    
                    std::cout << std::unitbuf << ".";
                }
                catch (std::exception const& e) {
                    std::cout << "exception: " << std::quoted(e.what()) << std::endl;
                }
                catch (...) {
                    std::cout << "exception" << std::endl;
                }
    
                boost::system::error_code ec;
                timer_.cancel(ec);
                socket_.close(ec);
            });
    
        }
    };
    
    struct my_server {  
        void start() {
            ba::io_context io_context;
            auto worker = ba::make_work_guard(io_context);
    
            ba::spawn(io_context, [&](ba::yield_context yield)
            {
                tcp::acceptor acceptor(io_context,
                tcp::endpoint(tcp::v4(), SERVER_PORT));
    
                for (;;)
                {
                    boost::system::error_code ec;
    
                    tcp::socket socket(io_context);
                    acceptor.async_accept(socket, yield[ec]);
                    if (!ec) {
                        std::make_shared<session>(io_context, std::move(socket))->go();
                    } 
                }
            });
    
            // Run io_context on All CPUs
            auto thread_count = std::thread::hardware_concurrency();
            boost::thread_group tgroup;
            for (auto i = 0u; i < thread_count; ++i) 
                tgroup.create_thread([&] {
                    for (;;) {
                        try { 
                            io_context.run(); 
                            break;
                        }
                        catch(const std::exception& e) { 
                            MessageBox(0, "This never popup", e.what(), 0); 
                        }
                        catch(const boost::exception& e) { 
                            MessageBox(0, "This never popup", boost::diagnostic_information(e).data(), 0); 
                        }
                        catch(...) { MessageBox(0, "This never popup", "", 0); }
                    }
    
                    std::cout << "stopped: " << io_context.stopped() << std::endl;
                });
            tgroup.join_all();
        }
    };  
    
    int main() {
        my_server svr;
        svr.start();
    }
    

    client.cpp

    #include <iostream>
    #include <random>
    #include <thread>
    #include <boost/asio.hpp>
    #include <boost/thread.hpp>
    
    using boost::asio::ip::tcp;
    namespace ba=boost::asio;
    
    #define SERVER "127.0.0.1"
    #define PORT "12345"
    
    int main() {
        ba::io_context io_context;
    
        static std::string const data_0x4000(0x4000, 'a');
    
        boost::thread_group tgroup;
        for (auto i = 0; i < 1000; ++i) 
            tgroup.create_thread([&] {
                for(;;) {
    
                    try {
                        tcp::socket s(io_context);
    
                        tcp::resolver resolver(io_context);
                        ba::connect(s, resolver.resolve(SERVER, PORT));
                        s.set_option(ba::socket_base::reuse_address(true));
    
                        ba::write(s, ba::buffer(data_0x4000));
                    } catch (std::exception const& e) {
                        std::cout << " exception: " << e.what() << std::endl;
                    } catch (...) {
                        std::cout << "unknown exception" << std::endl;
                    }
                    std::cout << std::unitbuf << ".";
                }
            });
    
        tgroup.join_all();
    }
    

    ¹ 参见例如https://docs.microsoft.com/en-us/cpp/build/reference/eh-exception-handling-model?view=vs-2019#remarks

    ² 也许MessageBox 只允许来自“UI”线程。

    【讨论】:

    • 如果我删除了MessageBox,就会出现问题。如果是 SEH,服务器会崩溃,因为我没有用 __try {} __except{} 块捕获它。 io_context.restart() 是一种解决方法,但无法解释问题。当端口用完时,我只需等待几分钟让操作系统释放端口,然后再做一次。我已经更新了帖子以寻求解决方法。
    猜你喜欢
    • 2020-06-17
    • 2021-11-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多