Tornado是很优秀的非阻塞式服务器,我们一般用它来写Web 服务器,据说知乎就是用Tornado写的。
如果对tornado源码不是很了解,可以先看一下另一篇文章:
http://yunjianfei.iteye.com/blog/2185476
通过详细阅读理解Tornado的源码,你将会获得以下收获:
1. 这是一个绝佳的学习python的机会,你会接触到generator/yield , with statment, functools.partial, concurrent.futures 等等很多平时较少接触到的只是
2. 可以更好的通过tornado来编写异步Server以及client
3. 更好的理解epoll,ET/LT相关知识
本文可以协助更好的去阅读理解tornado的源码,提供一个跟踪理解源码的思路和顺序。
从一个例子开始
注意:在源码跟踪过程中,一共有51个步骤,请务必按照步骤,打开你手中对应的源码,进行跟踪分析。
以下是一个异步客户端的例子,作用是获取www.baidu.com首页的内容。那么开始我们的源码跟踪之旅。
在开始跟踪前,请准备好tornado源码,本文中的源码均只截取了部分关键性的代码。
文件名: async.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
#!/usr/bin/env python2.7 # -*- coding: utf-8 -*- from tornado import ioloop, httpclient, gen
from tornado.gen import Task
import pdb, time, logging
#Init logging def init_logging():
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
sh = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')
sh.setFormatter(formatter)
logger.addHandler(sh)
logging.info("Current log level is : %s", logging.getLevelName(logger.getEffectiveLevel()))
init_logging() #pdb.set_trace() @gen.coroutine #注意这里是一个装饰器,是实现异步client的关键
def download(url):
http_client = httpclient.AsyncHTTPClient()
#6. 执行http_client.fetch(url),然后退出download函数,等待下次步骤5中的gen.next或者gen.send调用
#51. 获取从www.baidu.com返回的响应,赋值给response
response = yield http_client.fetch(url)
print 'response.length =', len(response.body)
ioloop.IOLoop.instance().stop()
future = download("http://www.baidu.com/") #0. 开始源码分析
print future
logging.info("****start ioloop*************")
ioloop.IOLoop.instance().start() #18. 启动ioloop
|
注意 :这里4.4x有修改:
在async.py 23行实例化 http_client = httpclient.AsyncHTTPClient() 我们看到继承了Configurable 详细看我的IOLoop实例化图 class AsyncHTTPClient(Configurable): def __new__(cls) #实例化之前先执行 super(AsyncHTTPClient,cls).__new__ 执行基类的new 而基类中也会执行configurable_default 这里返回的是 return SimpleAsyncHTTPClient 这个类 所以上面的http_client其实是SimpleAsyncHTTPClient的对象 所以找方法从其开始找 fetch_impl 其实例化时候直接封装了下面的返回结果 后面会用到OverrideResolver resolve方法 self.resolver = OverrideResolver(resolver=self.resolver, mapping=hostname_mapping) def resolve(self, host, port, *args, **kwargs): if (host, port) in self.mapping: host, port = self.mapping[(host, port)] elif host in self.mapping: host = self.mapping[host] return self.resolver.resolve(host, port, *args, **kwargs)
gen.py:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
def coroutine(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = TracebackFuture()
#1.返回download函数入口点(函数类型为generator)
result = func(*args, **kwargs)
if isinstance(result, types.GeneratorType):
def final_callback(value):
deactivate()
future.set_result(value)
runner = Runner(result, final_callback)
#2.启动generator调用http_client.fetch(url)
runner.run()
return future
return wrapper
class Runner(object):
def set_result(self, key, result):
self.results[key] = result
self.run() #47. 调用run
def is_ready(self, key):
if key not in self.pending_callbacks:
raise UnknownKeyError("key %r is not pending" % (key,))
#48.2 因46.步中调用set_result设定了对应的result,所以返回True
return key in self.results
def result_callback(self, key):
def inner(*args, **kwargs):
if kwargs or len(args) > 1:
result = Arguments(args, kwargs)
elif args:
result = args[0]
else:
result = None self.set_result(key, result) #46. 调用set_result
return wrap(inner) #这里的wrap是stack_context.py中的wrap
def register_callback(self, key):
if key in self.pending_callbacks:
raise KeyReuseError("key %r is already pending" % (key,))
self.pending_callbacks.add(key)
def run(self):
while True:
#3.第一次调用的是_NullYieldPoint中的is_ready,返回True
#48. 此时的yield_point是17.2步中self.yield_point = yielded设置的,返回True,参照48.1
if not self.yield_point.is_ready():
return #4.第一次调用的是_NullYieldPoint中的get_result,返回None
#49. 获取46步中设定的result(即从www.baidu.com返回的响应)
next = self.yield_point.get_result()
try:
#5.开始执行generator函数(即download),执行http_client.fetch(url)后跳出download,等待下一次调用gen.send或者gen.next
#50.给第6步的response变量设置从www.baidu.com返回的响应
yielded = self.gen.send(next)
except StopIteration:
return if isinstance(yielded, list):
yielded = Multi(yielded)
elif isinstance(yielded, Future): #注意:这里的yielded是7.1返回的Future对象
#17.1 生成YieldFuture对象,参数为7.1返回的Future对象
yielded = YieldFuture(yielded)
if isinstance(yielded, YieldPoint): #YieldFuture的父类是YieldPoint
self.yield_point = yielded
#17.2 调用start函数,注册result_callback(注意17.2.1和17.2.2)
self.yield_point.start(self)
class YieldFuture(YieldPoint): #YieldFuture的父类是YieldPoint
def __init__(self, future, io_loop=None):
self.future = future
self.io_loop = io_loop or IOLoop.current()
def start(self, runner):
self.runner = runner
self.key = object()
#17.2.1 将YieldFuture对象key追加到pending_callbacks中
runner.register_callback(self.key)
#17.2.2 在ioloop中注册一个future,调用concurrent.py中add_done_callback,用于最终返回www.baidu.com的响应,请查看17.2.3
self.io_loop.add_future(self.future, runner.result_callback(self.key))
def is_ready(self):
#48.1 调用runner中的is_ready, key为YieldFuture对象key,参照48.2
return self.runner.is_ready(self.key)
def get_result(self):
return self.runner.pop_result(self.key).result()
|
httpclient.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
class AsyncHTTPClient(Configurable):
def fetch(self, request, callback=None, **kwargs):
request.headers = httputil.HTTPHeaders(request.headers)
#根据参数"http://www.baidu.com/"生成request对象
request = _RequestProxy(request, self.defaults)
#初始化一个Future对象,Future参照:https://docs.python.org/3/library/concurrent.futures.html
future = Future()
#将http://www.baidu.com/返回的消息设置给download函数中的response参数
def handle_response(response):
if response.error:
future.set_exception(response.error)
else:
#44. 通过set_result来给第6步中的response变量赋值, 参照44.1
future.set_result(response)
self.fetch_impl(request, handle_response) #7.调用fetch的实现函数
return future #7.1 注意fetch返回的是Future对象,这里需要特别关注
|
simple_httpclient.py
4.4.x改动
1 def _process_queue(self): 2 with stack_context.NullContext(): 3 while self.queue and len(self.active) < self.max_clients: 4 key, request, callback = self.queue.popleft() 5 if key not in self.waiting: 6 continue 7 self._remove_timeout(key) 8 self.active[key] = (request, callback) 9 release_callback = functools.partial(self._release_fetch, key) 10 self._handle_request(request, release_callback, callback) 11 12 def _connection_class(self): 13 return _HTTPConnection 14 def _handle_request(self, request, release_callback, final_callback): 15 self._connection_class()( 16 self.io_loop, self, request, release_callback, 17 final_callback, self.max_buffer_size, self.tcp_client, 18 self.max_header_size, self.max_body_size)