Tornado是很优秀的非阻塞式服务器,我们一般用它来写Web 服务器,据说知乎就是用Tornado写的。 

 

如果对tornado源码不是很了解,可以先看一下另一篇文章:

http://yunjianfei.iteye.com/blog/2185476

 

通过详细阅读理解Tornado的源码,你将会获得以下收获:

1. 这是一个绝佳的学习python的机会,你会接触到generator/yield , with statment, functools.partial,  concurrent.futures 等等很多平时较少接触到的只是

2. 可以更好的通过tornado来编写异步Server以及client

3. 更好的理解epoll,ET/LT相关知识

 

本文可以协助更好的去阅读理解tornado的源码,提供一个跟踪理解源码的思路和顺序。

 

从一个例子开始

 

注意:在源码跟踪过程中,一共有51个步骤,请务必按照步骤,打开你手中对应的源码,进行跟踪分析。

 

以下是一个异步客户端的例子,作用是获取www.baidu.com首页的内容。那么开始我们的源码跟踪之旅。

在开始跟踪前,请准备好tornado源码,本文中的源码均只截取了部分关键性的代码。

 

文件名: async.py

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/env python2.7 
# -*- coding: utf-8 -*- 
from tornado import ioloop, httpclient, gen 
from tornado.gen import Task 
import pdb, time, logging 
#Init logging 
def init_logging(): 
    logger = logging.getLogger() 
    logger.setLevel(logging.DEBUG) 
    sh = logging.StreamHandler() 
    formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s'
    sh.setFormatter(formatter) 
   
    logger.addHandler(sh) 
    logging.info("Current log level is : %s", logging.getLevelName(logger.getEffectiveLevel())) 
   
init_logging() 
   
#pdb.set_trace() 
  
@gen.coroutine #注意这里是一个装饰器,是实现异步client的关键 
def download(url): 
    http_client = httpclient.AsyncHTTPClient() 
   
    #6. 执行http_client.fetch(url),然后退出download函数,等待下次步骤5中的gen.next或者gen.send调用 
    #51. 获取从www.baidu.com返回的响应,赋值给response 
    response = yield http_client.fetch(url)  
    print 'response.length =', len(response.body) 
    ioloop.IOLoop.instance().stop() 
   
future = download("http://www.baidu.com/"#0. 开始源码分析 
print future 
logging.info("****start ioloop*************"
ioloop.IOLoop.instance().start() #18. 启动ioloop 

注意 :这里4.4x有修改:

在async.py 23行实例化
http_client = httpclient.AsyncHTTPClient()  

我们看到继承了Configurable 详细看我的IOLoop实例化图
class AsyncHTTPClient(Configurable):
    
    
    def __new__(cls) #实例化之前先执行
        super(AsyncHTTPClient,cls).__new__ 执行基类的new
而基类中也会执行configurable_default 
这里返回的是        return SimpleAsyncHTTPClient
这个类
所以上面的http_client其实是SimpleAsyncHTTPClient的对象

所以找方法从其开始找
fetch_impl

其实例化时候直接封装了下面的返回结果 后面会用到OverrideResolver   resolve方法
self.resolver = OverrideResolver(resolver=self.resolver,
                                             mapping=hostname_mapping)

    def resolve(self, host, port, *args, **kwargs):
        if (host, port) in self.mapping:
            host, port = self.mapping[(host, port)]
        elif host in self.mapping:
            host = self.mapping[host]
        return self.resolver.resolve(host, port, *args, **kwargs)
4.4.x的改动

 

 

 

   gen.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def coroutine(func): 
    @functools.wraps(func) 
    def wrapper(*args, **kwargs): 
        future = TracebackFuture() 
           
        #1.返回download函数入口点(函数类型为generator) 
        result = func(*args, **kwargs)   
        if isinstance(result, types.GeneratorType): 
            def final_callback(value): 
                deactivate() 
                future.set_result(value) 
            runner = Runner(result, final_callback) 
               
            #2.启动generator调用http_client.fetch(url) 
            runner.run()  
            return future 
    return wrapper 
   
class Runner(object): 
    def set_result(self, key, result): 
        self.results[key] = result 
        self.run() #47. 调用run 
       
    def is_ready(self, key): 
        if key not in self.pending_callbacks: 
            raise UnknownKeyError("key %r is not pending" % (key,)) 
        #48.2 因46.步中调用set_result设定了对应的result,所以返回True 
        return key in self.results  
           
    def result_callback(self, key): 
        def inner(*args, **kwargs): 
            if kwargs or len(args) > 1
                result = Arguments(args, kwargs) 
            elif args: 
                result = args[0
            else
                result = None 
            self.set_result(key, result) #46. 调用set_result 
        return wrap(inner)  #这里的wrap是stack_context.py中的wrap 
       
    def register_callback(self, key): 
        if key in self.pending_callbacks: 
            raise KeyReuseError("key %r is already pending" % (key,)) 
        self.pending_callbacks.add(key) 
           
    def run(self): 
        while True
            #3.第一次调用的是_NullYieldPoint中的is_ready,返回True 
            #48. 此时的yield_point是17.2步中self.yield_point = yielded设置的,返回True,参照48.1 
            if not self.yield_point.is_ready():                         
                return 
            #4.第一次调用的是_NullYieldPoint中的get_result,返回None 
            #49. 获取46步中设定的result(即从www.baidu.com返回的响应) 
            next = self.yield_point.get_result()  
                                                    
            try
            #5.开始执行generator函数(即download),执行http_client.fetch(url)后跳出download,等待下一次调用gen.send或者gen.next 
            #50.给第6步的response变量设置从www.baidu.com返回的响应 
                yielded = self.gen.send(next)                          
            except StopIteration: 
                return 
                   
            if isinstance(yielded, list): 
                yielded = Multi(yielded) 
            elif isinstance(yielded, Future): #注意:这里的yielded是7.1返回的Future对象 
                #17.1  生成YieldFuture对象,参数为7.1返回的Future对象 
                yielded = YieldFuture(yielded)     
               
            if isinstance(yielded, YieldPoint): #YieldFuture的父类是YieldPoint 
                self.yield_point = yielded 
                #17.2 调用start函数,注册result_callback(注意17.2.1和17.2.2) 
                self.yield_point.start(self)  
                   
class YieldFuture(YieldPoint): #YieldFuture的父类是YieldPoint 
    def __init__(self, future, io_loop=None): 
        self.future = future 
        self.io_loop = io_loop or IOLoop.current() 
   
    def start(self, runner): 
        self.runner = runner 
        self.key = object() 
        #17.2.1 将YieldFuture对象key追加到pending_callbacks中 
        runner.register_callback(self.key) 
           
        #17.2.2 在ioloop中注册一个future,调用concurrent.py中add_done_callback,用于最终返回www.baidu.com的响应,请查看17.2.3 
        self.io_loop.add_future(self.future, runner.result_callback(self.key)) 
              
    def is_ready(self): 
        #48.1 调用runner中的is_ready, key为YieldFuture对象key,参照48.2 
        return self.runner.is_ready(self.key)  
   
    def get_result(self): 
        return self.runner.pop_result(self.key).result() 

  

 

httpclient.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class AsyncHTTPClient(Configurable): 
    def fetch(self, request, callback=None, **kwargs): 
        request.headers = httputil.HTTPHeaders(request.headers) 
        #根据参数"http://www.baidu.com/"生成request对象 
        request = _RequestProxy(request, self.defaults) 
           
        #初始化一个Future对象,Future参照:https://docs.python.org/3/library/concurrent.futures.html 
        future = Future()  
           
        #将http://www.baidu.com/返回的消息设置给download函数中的response参数 
        def handle_response(response): 
            if response.error: 
                future.set_exception(response.error) 
            else
                #44. 通过set_result来给第6步中的response变量赋值, 参照44.1 
                future.set_result(response)  
                   
        self.fetch_impl(request, handle_response)   #7.调用fetch的实现函数 
        return future  #7.1 注意fetch返回的是Future对象,这里需要特别关注 

  

  

 

 simple_httpclient.py

4.4.x改动

 

 1     def _process_queue(self):
 2         with stack_context.NullContext():
 3             while self.queue and len(self.active) < self.max_clients:
 4                 key, request, callback = self.queue.popleft()
 5                 if key not in self.waiting:
 6                     continue
 7                 self._remove_timeout(key)
 8                 self.active[key] = (request, callback)
 9                 release_callback = functools.partial(self._release_fetch, key)
10                 self._handle_request(request, release_callback, callback)
11 
12     def _connection_class(self):
13         return _HTTPConnection
14     def _handle_request(self, request, release_callback, final_callback):
15         self._connection_class()(
16             self.io_loop, self, request, release_callback,
17             final_callback, self.max_buffer_size, self.tcp_client,
18             self.max_header_size, self.max_body_size)
View Code

相关文章: