【问题标题】:C++ ASIO: Asynchronous sockets and threadingC++ ASIO:异步套接字和线程
【发布时间】:2016-03-03 13:00:47
【问题描述】:

我的应用程序基于 asio 聊天示例,由客户端和服务器组成: - 客户端:连接到服务器,接收请求并响应它 - 服务器:有一个 QT GUI(主线程)和一个网络服务(单独的线程)监听连接,向特定客户端发送请求并解释来自/在 GUI 中的响应

我想以异步方式实现这一点,以避免每个客户端连接使用单独的线程。

在我的 QT 窗口中,我有一个 io_service 实例和一个我的网络服务实例:

io_service_ = new asio::io_service();
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), "1234");
service_ = new Service(*io_service_, endpoint, this);

asio::io_service* ioServicePointer = io_service_;
t = std::thread{ [ioServicePointer](){ ioServicePointer->run(); } };

我希望能够向一个客户端发送数据,如下所示:

service_->send_message(selectedClient.id, msg);

我正在通过观察者模式接收和处理响应(窗口实现 IStreamListener 接口)

Service.cpp:

#include "Service.h"
#include "Stream.h"

void Service::runAcceptor()
{
    acceptor_.async_accept(socket_,
        [this](asio::error_code ec)
    {
        if (!ec)
        {
            std::make_shared<Stream>(std::move(socket_), &streams_)->start();

        }

        runAcceptor();
    });
}

void Service::send_message(std::string streamID, chat_message& msg)
{

    io_service_.post(
        [this, msg, streamID]()
    {
        auto stream = streams_.getStreamByID(streamID);
        stream->deliver(msg);
    }); 

}

流.cpp:

#include "Stream.h"
#include <iostream>
#include "../chat_message.h"

Stream::Stream(asio::ip::tcp::socket socket, StreamCollection* streams)
    : socket_(std::move(socket))
{
    streams_ = streams;         // keep a reference to the streamCollection

    // retrieve endpoint ip
    asio::ip::tcp::endpoint remote_ep = socket_.remote_endpoint();
    asio::ip::address remote_ad = remote_ep.address();
    this->ip_ = remote_ad.to_string();      
}

void Stream::start()
{
    streams_->join(shared_from_this());
    readHeader();
}

void Stream::deliver(const chat_message& msg)
{
    bool write_in_progress = !write_msgs_.empty();
    write_msgs_.push_back(msg);
    if (!write_in_progress)
    {
        write();
    }
}

std::string Stream::getName()
{
    return name_;
}

std::string Stream::getIP()
{
    return ip_;
}


void Stream::RegisterListener(IStreamListener *l)
{
    m_listeners.insert(l);
}

void Stream::UnregisterListener(IStreamListener *l)
{
    std::set<IStreamListener *>::const_iterator iter = m_listeners.find(l);
    if (iter != m_listeners.end())
    {
        m_listeners.erase(iter);
    }
    else {
        std::cerr << "Could not unregister the specified listener object as it is not registered." << std::endl;
    }
}

void Stream::readHeader()
{
    auto self(shared_from_this());
    asio::async_read(socket_,
        asio::buffer(read_msg_.data(), chat_message::header_length),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec && read_msg_.decode_header())
        {
            readBody();
        }
        else if (ec == asio::error::eof || ec == asio::error::connection_reset)
        {
            std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamListener *l) {l->onStreamDisconnecting(this->id()); });
            streams_->die(shared_from_this());
        }
        else
        {
            std::cerr << "Exception: " << ec.message();
        }
    });
}

void Stream::readBody()
{
    auto self(shared_from_this());
    asio::async_read(socket_,
        asio::buffer(read_msg_.body(), read_msg_.body_length()),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec)
        {
                    // notify the listener (GUI) that a response has arrived and pass a reference to it

            auto msg = std::make_shared<chat_message>(std::move(read_msg_));

            std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamListener *l) {l->onMessageReceived(msg); });

            readHeader();
        }
        else
        {
            streams_->die(shared_from_this());
        }
    });
}

void Stream::write()
{
    auto self(shared_from_this());
    asio::async_write(socket_,
        asio::buffer(write_msgs_.front().data(),
        write_msgs_.front().length()),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec)
        {
            write_msgs_.pop_front();
            if (!write_msgs_.empty())
            {
                write();
            }
        }
        else
        {
            streams_->die(shared_from_this());
        }
    });
}

接口

 class IStream
{
public: 
    /// Unique stream identifier
    typedef void* TId;
    virtual TId id() const
    {
        return (TId)(this);
    }

    virtual ~IStream() {}
    virtual void deliver(const chat_message& msg) = 0;

    virtual std::string getName() = 0;
    virtual std::string getIP() = 0;

    /// observer pattern    
    virtual void RegisterListener(IStreamListener *l) = 0;
    virtual void UnregisterListener(IStreamListener *l) = 0;
};

 class IStreamListener
{
public:
    virtual void onStreamDisconnecting(IStream::TId streamId) = 0;
    virtual void onMessageReceived(std::shared_ptr<chat_message> msg) = 0;

};

/*
    streamCollection / service delegates
*/
class IStreamCollectionListener
{
public:
    virtual void onStreamDied(IStream::TId streamId) = 0;
    virtual void onStreamCreated(std::shared_ptr<IStream> stream) = 0;

};

StreamCollection 基本上是一组 IStream:

 class StreamCollection
{
public:
    void join(stream_ptr stream)
    {
        streams_.insert(stream);
        std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamCollectionListener *l) {l->onStreamCreated(stream); });


    }
    // more events and observer pattern inplementation

首先:代码按预期工作。

我的问题: 这是 ASIO 应该用于异步编程的方式吗?我特别不确定Service::send_message 方法和io_service.post 的使用。就我而言,它的目的是什么?当我只是调用 async_write 时它也确实有效,而没有将其包装在 io_service.post 调用中。

我是否遇到了这种方法的问题?

【问题讨论】:

    标签: c++ boost boost-asio


    【解决方案1】:

    Asio 是 designed 被接受而不是框架。因此,有多种方法可以成功使用它。分离 GUI 和网络线程,并使用异步 I/O 来实现可扩展性可能是个好主意。

    在公共 API(例如 Service::send_message())中将工作委派给 io_service 会产生以下后果:

    • 将调用者的线程与服务io_service 的线程分离。例如,如果Stream::write() 执行耗时的加密功能,则调用者线程 (GUI) 不会受到影响。
    • 它提供线程安全。 io_service 是线程安全的;但是 socket 不是线程安全的。此外,其他对象可能不是线程安全的,例如write_msgs_。 Asio 保证只会从运行io_servce 的线程中调用处理程序。因此,如果只有一个线程在运行io_service,则不可能并发,socket_write_msgs_ 都将以线程安全的方式访问。 Asio 将此称为 隐式 strand。如果有多个线程在处理io_service,则可能需要使用显式strand 来提供线程安全。有关股线的更多详细信息,请参阅this 答案。

    其他 Asio 注意事项:

    • 观察者在处理程序中被调用,处理程序在网络线程中运行。如果任何观察者需要很长时间才能完成,例如必须与 GUI 线程接触的各种共享对象同步,那么它可能会导致其他操作的响应能力很差。考虑使用队列在 observersubject 组件之间代理事件。例如,可以使用另一个 io_service 作为队列,由它自己的线程运行,然后发布到其中:

      auto msg = std::make_shared<chat_message>(std::move(read_msg_));
      for (auto l: m_listeners)
          dispatch_io_service.post([=](){ l->onMessageReceived(msg); });
      
    • 验证write_msgs_ 的容器类型不会使push_back() 上现有元素和pop_front() 上的其他元素的迭代器、指针和引用无效。例如,使用std::liststd::dequeue 是安全的,但std::vector 可能会使对push_back 上现有元素的引用无效。

    • 对于单个Stream,可以多次调用StreamCollection::die()。此函数应该是幂等的或适当地处理副作用。
    • 在给定Stream 失败时,它的侦听器仅在一个路径中被通知断开连接:未能读取具有asio::error::eofasio::error::connection_reset 错误的标头。其他路径不调用IStreamListener.onStreamDisconnecting()
      • 标头已读取,但解码失败。在这种特殊情况下,整个读取链将在不通知其他组件的情况下停止。出现问题的唯一迹象是向std::cerr 发送打印声明。
      • 读取正文失败时。

    【讨论】:

    • 代码审查反馈:考虑使用基于范围的 for 循环;与样式和命名保持一致:streams_ vs m_listenersServiceStream 系列类型可以受益于更有意义的名称(例如是 IStream 和输入流还是流接口)?
    • 非常感谢您的反馈!
    猜你喜欢
    • 1970-01-01
    • 2011-12-05
    • 2018-03-25
    • 1970-01-01
    • 2010-11-28
    • 1970-01-01
    • 2016-03-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多