书接上文 继续分析Socket.h SocketMgr.h
template<class T>
class Socket : public std::enable_shared_from_this<T>
根据智能指针的使用规则 类中有使用本类自己的指针 必须继承自enable_shared_from_this<> 防止自引用 不能释放的BUG
class Socket封装了asio中的socket类 获取远端ip 端口等功能, 并且额外提供异步读写的功能
类中的两个原子变量 _closed _closing标记该socket的关闭开启状态
bool Update()函数根据socket是否是同步异步标记进行写入队列的处理。 同步则进行处理 异步则暂缓
void AsyncRead() void AsyncReadWithCallback(void (T::*callback)(boost::system::error_code, std::size_t))
则采取异步读取socket 调用默认函数ReadHandlerInternal() 或者输入函数T::*callback()
由于AsyncReadWithCallback 函数中bind 需要 T类的指针 所以才有开头的继承std::enable_shared_from_this<T>
但是使用比较怪异 std::enable_shared_from_this<>用法一般是继承自己本身
class self :: public std::enable_shared_from_this<self>{
public:
void test(){
// only for test
std::bind(&self ::test, shared_from_this());
}
}
异步写write类似 ,由bool AsyncProcessQueue()函数发起
使用asio的async_write_some函数异步读取连接内容 并调用回调函数WriteHandler()或者WriteHandlerWrapper()
不过需要结合MessageBuffer 一起跟进流程
类代码如下
1 /* 2 * Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/> 3 * 4 * This program is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License as published by the 6 * Free Software Foundation; either version 2 of the License, or (at your 7 * option) any later version. 8 * 9 * This program is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 12 * more details. 13 * 14 * You should have received a copy of the GNU General Public License along 15 * with this program. If not, see <http://www.gnu.org/licenses/>. 16 */ 17 18 #ifndef __SOCKET_H__ 19 #define __SOCKET_H__ 20 21 #include "MessageBuffer.h" 22 #include "Log.h" 23 #include <atomic> 24 #include <queue> 25 #include <memory> 26 #include <functional> 27 #include <type_traits> 28 #include <boost/asio/ip/tcp.hpp> 29 30 using boost::asio::ip::tcp; 31 32 #define READ_BLOCK_SIZE 4096 33 #ifdef BOOST_ASIO_HAS_IOCP 34 #define TC_SOCKET_USE_IOCP 35 #endif 36 37 template<class T> 38 class Socket : public std::enable_shared_from_this<T> 39 { 40 public: 41 explicit Socket(tcp::socket&& socket) : _socket(std::move(socket)), _remoteAddress(_socket.remote_endpoint().address()), 42 _remotePort(_socket.remote_endpoint().port()), _readBuffer(), _closed(false), _closing(false), _isWritingAsync(false) 43 { 44 _readBuffer.Resize(READ_BLOCK_SIZE); 45 } 46 47 virtual ~Socket() 48 { 49 _closed = true; 50 boost::system::error_code error; 51 _socket.close(error); 52 } 53 54 virtual void Start() = 0; 55 56 virtual bool Update() 57 { 58 if (_closed) 59 return false; 60 61 #ifndef TC_SOCKET_USE_IOCP 62 if (_isWritingAsync || (_writeQueue.empty() && !_closing)) 63 return true; 64 65 for (; HandleQueue();) 66 ; 67 #endif 68 69 return true; 70 } 71 72 boost::asio::ip::address GetRemoteIpAddress() const 73 { 74 return _remoteAddress; 75 } 76 77 uint16 GetRemotePort() const 78 { 79 return _remotePort; 80 } 81 82 void AsyncRead() 83 { 84 if (!IsOpen()) 85 return; 86 87 _readBuffer.Normalize(); 88 _readBuffer.EnsureFreeSpace(); 89 _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()), 90 std::bind(&Socket<T>::ReadHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2)); 91 } 92 93 void AsyncReadWithCallback(void (T::*callback)(boost::system::error_code, std::size_t)) 94 { 95 if (!IsOpen()) 96 return; 97 98 _readBuffer.Normalize(); 99 _readBuffer.EnsureFreeSpace(); 100 _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()), 101 std::bind(callback, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2)); 102 } 103 104 void QueuePacket(MessageBuffer&& buffer) 105 { 106 _writeQueue.push(std::move(buffer)); 107 108 #ifdef TC_SOCKET_USE_IOCP 109 AsyncProcessQueue(); 110 #endif 111 } 112 113 bool IsOpen() const { return !_closed && !_closing; } 114 115 void CloseSocket() 116 { 117 if (_closed.exchange(true)) 118 return; 119 120 boost::system::error_code shutdownError; 121 _socket.shutdown(boost::asio::socket_base::shutdown_send, shutdownError); 122 if (shutdownError) 123 TC_LOG_DEBUG("network", "Socket::CloseSocket: %s errored when shutting down socket: %i (%s)", GetRemoteIpAddress().to_string().c_str(), 124 shutdownError.value(), shutdownError.message().c_str()); 125 126 OnClose(); 127 } 128 129 /// Marks the socket for closing after write buffer becomes empty 130 void DelayedCloseSocket() { _closing = true; } 131 132 MessageBuffer& GetReadBuffer() { return _readBuffer; } 133 134 protected: 135 virtual void OnClose() { } 136 137 virtual void ReadHandler() = 0; 138 139 bool AsyncProcessQueue() 140 { 141 if (_isWritingAsync) 142 return false; 143 144 _isWritingAsync = true; 145 146 #ifdef TC_SOCKET_USE_IOCP 147 MessageBuffer& buffer = _writeQueue.front(); 148 _socket.async_write_some(boost::asio::buffer(buffer.GetReadPointer(), buffer.GetActiveSize()), std::bind(&Socket<T>::WriteHandler, 149 this->shared_from_this(), std::placeholders::_1, std::placeholders::_2)); 150 #else 151 _socket.async_write_some(boost::asio::null_buffers(), std::bind(&Socket<T>::WriteHandlerWrapper, 152 this->shared_from_this(), std::placeholders::_1, std::placeholders::_2)); 153 #endif 154 155 return false; 156 } 157 158 void SetNoDelay(bool enable) 159 { 160 boost::system::error_code err; 161 _socket.set_option(tcp::no_delay(enable), err); 162 if (err) 163 TC_LOG_DEBUG("network", "Socket::SetNoDelay: failed to set_option(boost::asio::ip::tcp::no_delay) for %s - %d (%s)", 164 GetRemoteIpAddress().to_string().c_str(), err.value(), err.message().c_str()); 165 } 166 167 private: 168 void ReadHandlerInternal(boost::system::error_code error, size_t transferredBytes) 169 { 170 if (error) 171 { 172 CloseSocket(); 173 return; 174 } 175 176 _readBuffer.WriteCompleted(transferredBytes); 177 ReadHandler(); 178 } 179 180 #ifdef TC_SOCKET_USE_IOCP 181 182 void WriteHandler(boost::system::error_code error, std::size_t transferedBytes) 183 { 184 if (!error) 185 { 186 _isWritingAsync = false; 187 _writeQueue.front().ReadCompleted(transferedBytes); 188 if (!_writeQueue.front().GetActiveSize()) 189 _writeQueue.pop(); 190 191 if (!_writeQueue.empty()) 192 AsyncProcessQueue(); 193 else if (_closing) 194 CloseSocket(); 195 } 196 else 197 CloseSocket(); 198 } 199 200 #else 201 202 void WriteHandlerWrapper(boost::system::error_code /*error*/, std::size_t /*transferedBytes*/) 203 { 204 _isWritingAsync = false; 205 HandleQueue(); 206 } 207 208 bool HandleQueue() 209 { 210 if (_writeQueue.empty()) 211 return false; 212 213 MessageBuffer& queuedMessage = _writeQueue.front(); 214 215 std::size_t bytesToSend = queuedMessage.GetActiveSize(); 216 217 boost::system::error_code error; 218 std::size_t bytesSent = _socket.write_some(boost::asio::buffer(queuedMessage.GetReadPointer(), bytesToSend), error); 219 220 if (error) 221 { 222 if (error == boost::asio::error::would_block || error == boost::asio::error::try_again) 223 return AsyncProcessQueue(); 224 225 _writeQueue.pop(); 226 if (_closing && _writeQueue.empty()) 227 CloseSocket(); 228 return false; 229 } 230 else if (bytesSent == 0) 231 { 232 _writeQueue.pop(); 233 if (_closing && _writeQueue.empty()) 234 CloseSocket(); 235 return false; 236 } 237 else if (bytesSent < bytesToSend) // now n > 0 238 { 239 queuedMessage.ReadCompleted(bytesSent); 240 return AsyncProcessQueue(); 241 } 242 243 _writeQueue.pop(); 244 if (_closing && _writeQueue.empty()) 245 CloseSocket(); 246 return !_writeQueue.empty(); 247 } 248 249 #endif 250 251 tcp::socket _socket; 252 253 boost::asio::ip::address _remoteAddress; 254 uint16 _remotePort; 255 256 MessageBuffer _readBuffer; 257 std::queue<MessageBuffer> _writeQueue; 258 259 std::atomic<bool> _closed; 260 std::atomic<bool> _closing; 261 262 bool _isWritingAsync; 263 }; 264 265 #endif // __SOCKET_H__