【问题标题】:How to use multiclient Server using boost::thread_group?如何使用 boost::thread_group 使用多客户端服务器?
【发布时间】:2023-04-04 02:29:01
【问题描述】:

我想使用 boost::asio 和 boost::thread_group 创建多客户端服务器。

我试过了

//Listener.h 
#pragma once
#include <boost/asio/ip/tcp.hpp>
#include <boost/bind.hpp>
#include <memory>
#include <boost/asio/io_context.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/thread/thread.hpp>
#include "Session.h"
#define PORT_NUMBER 12880

class Listener
{
public:
    Listener(boost::asio::io_context& iosvc, boost::asio::ip::tcp::endpoint& ep);

    void Start_accept();


private:
    boost::asio::io_context &_iosvc;
    boost::asio::ip::tcp::acceptor _acceptor;
    boost::thread_group threadPool;
    void OnAcceptComplete(const boost::system::error_code& error, Session* session);
};

这是 Listener.cpp 文件

//Listener.cpp
#pragma once
#include <iostream>
#include "Listener.h"
#include "SessionManager.h"

Listener::Listener(boost::asio::io_context& iosvc, boost::asio::ip::tcp::endpoint& ep) :
    _acceptor(iosvc, ep), _iosvc(iosvc) {}


void Listener::Start_accept()
{
    //Create Client Session
    Session* new_session = clientsession::SessionManager::GetInstance()->GenerateSession(_iosvc);
    std::cout << "Listener thread ID : " << std::this_thread::get_id << '\n';
    std::cout << "Listening....." << "\n\n";
    _acceptor.async_accept(*(new_session->Socket()),
        boost::bind(&Listener::OnAcceptComplete, this, boost::asio::placeholders::error, new_session));
}
void Listener::OnAcceptComplete(const boost::system::error_code& error, Session* session)
{
    if (!error) 
    {
        std::cout << "accept completed" << '\n';

        threadPool.create_thread(boost::bind(&boost::asio::io_context::run, &(session->_iocontext)));
        session->_iocontext.post(boost::bind(&Session::StartSession,this)); // error '->*': Cannot convert from'T*' to'Session *'.
        
        threadPool.join_all();
    }

    else
    {
        std::cout << error.message() << '\n';
    }
    Start_accept();
}

但是 session->_iocontext.post(boost::bind(&Session::StartSession,this)); // 错误'->': 无法从'T' 转换为'Session *'。

Server的主要语句如下。

//Server.cpp
#include <iostream>
#include <boost/thread/thread.hpp>
#include "Listener.h"
#include "SessionManager.h"
using namespace boost::asio;
using namespace clientsession;
SessionManager* _sessionManager;

int main()
{
    try
    {
        boost::asio::io_context iocontext;
        ip::tcp::endpoint ep(ip::address::from_string("127.0.0.1"), PORT_NUMBER); //ipv4, 포트번호
        Listener* _listener = new Listener(iocontext, ep);

        _listener->Start_accept();
        iocontext.run();

    }
    catch (std::exception& e)
    {
        std::cout << e.what() << '\n';
    }
    while (true)
    {
    }
    return 0;

}

我想做多线程服务器[一个 io_context 监听器(包括接受),多个 io_context 会话(用于客户端)]

换句话说 我想更改为每个客户端创建会话线程的结构。 我不知道我创建的代码是否正确。

可以进行 N:1 通信,但会话处理是同步进行的。所以我想让一个线程成为一个会话并异步处理它。

【问题讨论】:

    标签: c++ multithreading boost


    【解决方案1】:

    换句话说,我想更改为每个客户端创建会话线程的结构。

    这是 ASIO 中的反模式。您可以,但请阅读以下内容:https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/overview/core/threads.html

    在 Asio 中拥有单独线程的一种自然方式是每个线程也有一个 io_context。然而,一个更自然的方式是每个客户端都有一个 strand(“逻辑线程”)和一个反映系统上真实内核数的固定线程池.

    我不知道我创建的代码是否正确。

    实际上是这样,因为它无法编译。

        session->_iocontext.post(boost::bind(&Session::StartSession,this)); // error '->*': Cannot convert from'T*' to'Session *'.
    

    this 在这里是Listener*。当然,您不能将Session 的方法绑定到Listener 类的实例。你想要的是会话:

    post(session->_iocontext, boost::bind(&Session::StartSession, session));
    

    接下来:

    • threadPool.join_all(); 出现在OnAcceptedComplete 中,这意味着在考虑接受任何其他连接之前,您实际上会等待所有线程。这意味着您实际上最好使用单线程阻塞服务器。

    • 你需要INVOKE std::this_thread::get_id:加括号std::this_thread::get_id()(让我想起了node addon with cluster get process id returns same id for all forked process's

    • 所有原始指针和 new 没有 delete 将导致内存泄漏或过时指针的噩梦。

      • 为什么监听器不在栈上?

          Listener _listener(iocontext, ep);
          _listener.Start_accept();
        
      • 为什么Socket() 不返回引用?现在你只能像这样使用它

          tcp::socket _socket;
          auto* Socket() { return &_socket; }
          // ...
          _acceptor.async_accept(*(new_session->Socket()),
        

        哎呀。更简单:

          auto& Socket() { return _socket; }
          // ....
          _acceptor.async_accept(new_session->Socket(),
        
      • 等等

    • 您将StartSession 发布到 io 服务:

       post(/*some service*/, boost::bind(&Session::StartSession, session));
      

      确实,这仅适用于该线程使用链的私有服务。使用 strands 似乎仍然是多余的,因为这总是发生在新接受的会话上,所以没有其他线程可能知道它。

      依赖线程本地存储时例外?

      或者您对GenerateSession 的实施可能不会产生独特的会话?

    最小的修复

    这是我的最小修复。我建议不要使用多线程(反正你是在做异步 IO)。

    #define PORT_NUMBER 12880
    #include <boost/asio.hpp>
    #include <boost/bind/bind.hpp>
    #include <iomanip>
    #include <iostream>
    #include <memory>
    using boost::asio::ip::tcp;
    using boost::system::error_code;
    
    struct Session : std::enable_shared_from_this<Session> {
        template <typename Executor>
        Session(Executor executor) : _socket(make_strand(executor)) {}
    
        auto& Socket() { return _socket; }
    
        void StartSession() {
            std::cout << "Session: " << this << " StartSession" << std::endl;
            async_read(_socket, boost::asio::dynamic_buffer(received),
               [self = shared_from_this()](auto ec, size_t tx) { self->on_received(ec, tx); });
        };
    
      private:
        tcp::socket _socket;
        std::string received;
        void on_received(error_code ec, size_t /*bytes_transferred*/) {
            std::cout << "Session: " << this << " on_received " << ec.message() << " " << std::quoted(received) << std::endl;
        }
    };
    
    using SessionPtr = std::shared_ptr<Session>;
    
    namespace clientsession {
        struct SessionManager {
            static SessionManager* GetInstance() {
                static SessionManager instance;
                return &instance;
            }
            template <typename... T> SessionPtr GenerateSession(T&&... args) {
                return std::make_shared<Session>(std::forward<T>(args)...);
            }
        };
    } // namespace clientsession
    
    class Listener {
        public:
            Listener(boost::asio::io_context& iosvc, tcp::endpoint ep)
                : _iosvc(iosvc), _acceptor(iosvc, ep) {}
    
            void Start_accept() {
                // Create Client Session
                auto new_session = clientsession::SessionManager::GetInstance()
                    ->GenerateSession(_iosvc.get_executor());
    
                std::cout << "Listening.....\n\n";
                _acceptor.async_accept(new_session->Socket(),
                        boost::bind(&Listener::OnAcceptComplete, this,
                            boost::asio::placeholders::error,
                            new_session));
            }
    
        private:
            boost::asio::io_context& _iosvc;
            tcp::acceptor _acceptor;
            void OnAcceptComplete(error_code error, SessionPtr session) {
                if (!error) {
                    std::cout << "accept completed\n";
                    session->StartSession();
                    Start_accept();
                } else {
                    std::cout << error.message() << '\n';
                }
            }
    };
    
    int main() {
        try {
            boost::asio::io_context iocontext;
            Listener listener(iocontext, {{}, PORT_NUMBER}); // ipv4, port number
            listener.Start_accept();
    
            iocontext.run();
        } catch (std::exception& e) {
            std::cout << e.what() << '\n';
        }
    }
    

    在同时使用一堆客户端进行测试时:

    for a in {1..10};
    do
        (sleep 1.$RANDOM; echo -n "hellow world $RANDOM") |
             netcat -w 2 localhost 12880&
    done; 
    time wait
    

    打印出类似的东西

    Listening.....
    
    accept completed
    Session: 0x56093453c1b0 StartSession
    Listening.....
    
    accept completed
    Session: 0x56093453de60 StartSession
    Listening.....
    
    accept completed
    Session: 0x56093453e3c0 StartSession
    Listening.....
    
    accept completed
    Session: 0x56093453e920 StartSession
    Listening.....
    
    accept completed
    Session: 0x56093453ee80 StartSession
    Listening.....
    
    accept completed
    Session: 0x56093453f3e0 StartSession
    Listening.....
    
    accept completed
    Session: 0x56093453f940 StartSession
    Listening.....
    
    accept completed
    Session: 0x56093453fea0 StartSession
    Listening.....
    
    accept completed
    Session: 0x560934540400 StartSession
    Listening.....
    
    accept completed
    Session: 0x560934540960 StartSession
    Listening.....
    
    Session: 0x56093453f940 on_received End of file "hellow world 10149"
    Session: 0x56093453fea0 on_received End of file "hellow world 22492"
    Session: 0x560934540400 on_received End of file "hellow world 29539"
    Session: 0x56093453c1b0 on_received End of file "hellow world 20494"
    Session: 0x56093453ee80 on_received End of file "hellow world 24735"
    Session: 0x56093453de60 on_received End of file "hellow world 8071"
    Session: 0x560934540960 on_received End of file "hellow world 27606"
    Session: 0x56093453e920 on_received End of file "hellow world 534"
    Session: 0x56093453e3c0 on_received End of file "hellow world 21676"
    Session: 0x56093453f3e0 on_received End of file "hellow world 24362"
    

    【讨论】:

    • 有关管理会话生命周期的一些灵感/替代方法,您可以阅读此处的链接stackoverflow.com/a/65023952/85371
    • 改进示例以突出会话是同时进行的
    • 您真诚的回答对我帮助很大。我还发现了“this -> Listener -> session”错误 :) 我得到了很多灵感。谢谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-11-20
    • 1970-01-01
    • 1970-01-01
    • 2016-05-31
    • 1970-01-01
    • 2022-01-17
    • 1970-01-01
    相关资源
    最近更新 更多