前言
tornado中http1connection文件的作用极其重要,他实现了http1.x协议。
本模块基于gen模块和iostream模块实现异步的处理请求或者响应。
阅读本文需要一些基础的http知识。
正文:
http协议是建立在tcp基础上的应用层协议,tcp层由TCPServer,IOStream负责,对http报文的读取与解析则由http1connection负责。当http报文被解析完成再交由给某个delegate类实例负责进行后续处理。
接下来说一下大体的过程(tornado作为服务器端):
如果客户端要向服务端发送http请求,首先要建立tcp连接,
tornado在连接建立完成后会将连接封装为一个IOStream对象,这个对象可以异步的从连接中读写数据
tornado中又实现了HTTP1ServerConnection与HTTP1Connection两个类,他们依赖于底层的IOStream从套接字中读写,并共同合作完成了http1.x协议。
HTTP1Connection实际主要是用来处理http事务(http权威指南:http事务是由一条请求以及对应该请求的响应组成),当然他自己实现了前半部分,也就是对报文起始行、首部、主体进行读,解析;后半部分需要配合HTTPMessageDelegate进行工作。
HTTPMessageDelegate对经过HTTP1Connection解析后的报文进行分配,然后由其他类(比如说RequestHandler)执行具体的业务逻辑。
其他类生成响应,并且把响应发送到IOStream中,这时就表示着这条http事务已经完成,我们需要根据情况判断是否关闭连接。
HTTP1ServerConnection则是不断的生成HTTP1Connection实例,也就是不断的处理http事务,直到连接关闭。
HTTP1Connection类:
先看HTTP1Connection这个类。这个类实际上主要完成的工作可以概括为两个:
1 读取并解析报文消息
2 写入报文
1 读取并解析报文
当请求或者响应到来的时候,read_response是解析消息的入口,尽管该方法读起来好像仅仅是针对响应,但因为不管是请求或者是响应格式是相差不大的,所以不管是请求或者是响应他都是可以处理的。read_response中主要调用了_read_message方法,解析报文的逻辑也都是在_read_message方法中,另外,本文主要是对其作为服务端时进行分析
先来说一下_read_mssage的大体逻辑:
首先HTTP1Connection基于iostream读取请求报文,并对请求报文进行解析,分离出起始行 请求首部,并根据请求首部判定是否读取消息主体以及消息主体的长度。
在这个过程中,分析起始行的信息然后委托给代理(HTTPMessageDelegate)获取对应的RequestHandler(这一步主要是_RequestDispatcher类实现的),并实例化,根据起始行的method调用相关方法,在调用方法执行业务逻辑时可能会用到模板语言,cookie,csrf等等其他东西,但最终会产生响应,并将响应发送到IOStream中。这些工作都是delegate干的。
最后HTTP1Connection等待响应发送完成(这一步操作是异步的),根据是否支持keep-alive决定是否处理完后关闭连接
def read_response(self, delegate): """Read a single HTTP response. """ if self.params.decompress: delegate = _GzipMessageDelegate(delegate, self.params.chunk_size) return self._read_message(delegate) @gen.coroutine def _read_message(self, delegate): # self是HTTP1Connection实例对象,delegate是_ServerRequestAdapter实例对象 need_delegate_close = False try: header_future = self.stream.read_until_regex(b"\r?\n\r?\n", max_bytes=self.params.max_header_size) # 两种方式来等待请求头(在服务器模式下是请求,客户端模式下是响应)的读,第一种是什么时候发送过来什么时候读, # 第二种是设置超时时间时长的定时任务,如果这段时间内没有发送过来那么就关闭连接 if self.params.header_timeout is None: # 第一种 header_data = yield header_future # 获取起始行以及头部信息的bytes流 else: # 第二种 try: header_data = yield gen.with_timeout( self.stream.io_loop.time() + self.params.header_timeout, header_future, io_loop=self.stream.io_loop, quiet_exceptions=iostream.StreamClosedError) except gen.TimeoutError: self.close() raise gen.Return(False) start_line, headers = self._parse_headers(header_data) # 获取起始行以及头部信息 if self.is_client: start_line = httputil.parse_response_start_line(start_line) self._response_start_line = start_line else: start_line = httputil.parse_request_start_line(start_line) self._request_start_line = start_line self._request_headers = headers self._disconnect_on_finish = not self._can_keep_alive(start_line, headers) # 如果不是keep alive在响应结束后关闭连接 need_delegate_close = True with _ExceptionLoggingContext(app_log): # 这一步会做了很多东西,如果是服务器端,这一步会设置request对象,并根据请求中的url选择对应handler header_future = delegate.headers_received(start_line, headers) if header_future is not None: yield header_future if self.stream is None: # We've been detached. need_delegate_close = False raise gen.Return(False) skip_body = False if self.is_client: # 作为client if (self._request_start_line is not None and self._request_start_line.method == 'HEAD'): # 如果方法是HEAD,那么默认是没有主体的。即使有主体也会被忽略掉 skip_body = True code = start_line.code # 如果客户端发送了一个带条件的GET 请求且该请求已被允许,而文档的内容(自上次访问以来或者根据请求的条件)并没有改变,则服务器应当返回这个304状态码 if code == 304: # 304报文可能会包含content-length首部属性,但实际上是没有消息主体的 # http://tools.ietf.org/html/rfc7230#section-3.3 skip_body = True if code >= 100 and code < 200: # 临时的响应。客户端在收到常规响应之前,应准备接收一个或多个1XX响应 # 1xx 报文是不能包含主体信息的 if ('Content-Length' in headers or 'Transfer-Encoding' in headers): raise httputil.HTTPInputError( "Response code %d cannot have body" % code) # 我们所需要的真正的响应还没有接收到,所以继续接收 yield self._read_message(delegate) else: # 1、Expect:100-continue 用于客户端在发送POST数据给服务器前,征询服务器情况,看服务器是否处理POST的数据, # 如果不处理,客户端则不上传POST数据,如果处理,则POST上传数据。在现实应用中,通过在POST大数据时,才会使用100-continue协议。 # http://www.cnblogs.com/tekkaman/archive/2013/04/03/2997781.html if (headers.get("Expect") == "100-continue" and not self._write_finished): # 默认是支持的,所以收到请求后,返回100。 self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n") if not skip_body:
# 这一步读取消息主体 body_future = self._read_body(start_line.code if self.is_client else 0, headers, delegate) if body_future is not None: if self._body_timeout is None: yield body_future else: try: yield gen.with_timeout( self.stream.io_loop.time() + self._body_timeout, body_future, self.stream.io_loop, quiet_exceptions=iostream.StreamClosedError) except gen.TimeoutError: gen_log.info("Timeout reading body from %s", self.context) self.stream.close() raise gen.Return(False) self._read_finished = True if not self._write_finished or self.is_client: need_delegate_close = False with _ExceptionLoggingContext(app_log): # 如果是服务器端,这一步会生成对应的handler实例,然后执行业务逻辑,最后将响应写入IOStream中 delegate.finish() # If we're waiting for the application to produce an asynchronous # response, and we're not detached, register a close callback # on the stream (we didn't need one while we were reading) # 等待异步响应完成,所有数据都写入 fd,才继续后续处理,详细见 _finish_request/finish 方法实现。 # 当异步写完成,在HTTPServerRequest中调用当前对象的finish方法,finish方法则会调用_finisth_request方法,该方法内部会对_finish_future对象set_result if (not self._finish_future.done() and self.stream is not None and not self.stream.closed()): self.stream.set_close_callback(self._on_connection_close) yield self._finish_future # 判定是否关闭连接,服务器端一般等待客户端主动关闭,而如果是客户端则根据是否持久连接进行关闭 if self.is_client and self._disconnect_on_finish: self.close() if self.stream is None: raise gen.Return(False) except httputil.HTTPInputError as e: gen_log.info("Malformed HTTP message from %s: %s", self.context, e) self.close() raise gen.Return(False) finally: if need_delegate_close: with _ExceptionLoggingContext(app_log): delegate.on_connection_close() self._clear_callbacks() raise gen.Return(True)
值得注意的是,该方法中读取报文主体的几种方式:
1 假设没有开启keep-alive,那么我们将连接结束作为报文终止的标志
2 而如果开启了keep-alive,那么我们根据Content-Length确定当前报文的终止位置
3 如果开启了分块传输编码(Transfer-Encoding:chunked,这时候Content-Length就不起作用了,实际上在tornado中如果Content-Length以及分块传输编码都指定则会返回错误)那么就会根据分块传输编码的格式一直读取,直到读取到b"0\r\n"时就可以确定当前报文已终止
_read_body方法则根据情况选择读取报文主体的方式,以上三种选择分别对应于以下三种方法:
1 _read_body_until_close
2 _read_fixed_body
3 _read_chunked_body
来看一下代码:
def _read_body(self, code, headers, delegate): # https://imququ.com/post/transfer-encoding-header-in-http.html if "Content-Length" in headers: if "Transfer-Encoding" in headers: # Response cannot contain both Content-Length and # Transfer-Encoding headers. # If a message is received with both a Transfer-Encoding and a # Content-Length header field, the Transfer-Encoding overrides the # Content-Length. # http://tools.ietf.org/html/rfc7230#section-3.3.3 raise httputil.HTTPInputError("Response with both Transfer-Encoding and Content-Length") if "," in headers["Content-Length"]: # Proxies sometimes cause Content-Length headers to get # duplicated. If all the values are identical then we can # use them but if they differ it's an error. pieces = re.split(r',\s*', headers["Content-Length"]) if any(i != pieces[0] for i in pieces): raise httputil.HTTPInputError( "Multiple unequal Content-Lengths: %r" % headers["Content-Length"]) headers["Content-Length"] = pieces[0] try: content_length = int(headers["Content-Length"]) except ValueError: # Handles non-integer Content-Length value. raise httputil.HTTPInputError( "Only integer Content-Length is allowed: %s" % headers["Content-Length"]) if content_length > self._max_body_size: raise httputil.HTTPInputError("Content-Length too long") else: content_length = None if code == 204: # 状态码204(无内容) # This response code is not allowed to have a non-empty body, # and has an implicit length of zero instead of read-until-close. # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3 if ("Transfer-Encoding" in headers or content_length not in (None, 0)): raise httputil.HTTPInputError( "Response with code %d should not have body" % code) content_length = 0 if content_length is not None: # 而如果开启了keep-alive,那么我们根据Content-Length确定当前报文的终止位置 return self._read_fixed_body(content_length, delegate) if headers.get("Transfer-Encoding") == "chunked": # 开启了分块传输编码 return self._read_chunked_body(delegate) if self.is_client: # 非持久连接 return self._read_body_until_close(delegate) return None @gen.coroutine def _read_fixed_body(self, content_length, delegate): while content_length > 0: body = yield self.stream.read_bytes(min(self.params.chunk_size, content_length), partial=True) content_length -= len(body) if not self._write_finished or self.is_client: with _ExceptionLoggingContext(app_log): ret = delegate.data_received(body) if ret is not None: yield ret @gen.coroutine def _read_chunked_body(self, delegate): # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1 total_size = 0 while True: # 先读取chunk长度 chunk_len = yield self.stream.read_until(b"\r\n", max_bytes=64) chunk_len = int(chunk_len.strip(), 16) # 如果chunk长度为0,表示分块传输的终止 if chunk_len == 0: return total_size += chunk_len # 检测长度是否过大 if total_size > self._max_body_size: raise httputil.HTTPInputError("chunked body too large") bytes_to_read = chunk_len while bytes_to_read: # 读取该长度对应的data chunk = yield self.stream.read_bytes( min(bytes_to_read, self.params.chunk_size), partial=True) bytes_to_read -= len(chunk) if not self._write_finished or self.is_client: with _ExceptionLoggingContext(app_log): # 读取的消息主体要交给代理(HTTPMessageDelegate)处理 ret = delegate.data_received(chunk) if ret is not None: yield ret # chunk ends with \r\n # 每一个data后面都有一个CRLF crlf = yield self.stream.read_bytes(2) assert crlf == b"\r\n" @gen.coroutine def _read_body_until_close(self, delegate): body = yield self.stream.read_until_close() if not self._write_finished or self.is_client: with _ExceptionLoggingContext(app_log): delegate.data_received(body)