转自:http://strawhatfy.github.io/2015/07/22/Tornado.gen/

引言

注:正文中引用的 Tornado 代码除特别说明外,都默认引用自 Tornado 4.0.1。

tornado.gen 模块是一个基于 python generator 实现的异步编程接口。通过该模块提供的 coroutine (注:这里 coroutine 指的是 ”协程” 概念而不是后面具体实现的 decorator:@gen.decorator),大大简化了在 Tornado 中编写异步代码的工作 —— 支持 “同步方式编写异步代码” ,避免编写烦人的回调函数。参考官方文档的例子,通常我们编写的异步代码如下:

1
2
3
4
5
6
7
8
9
10
class AsyncHandler(RequestHandler):
@asynchronous
def get(self):
http_client = AsyncHTTPClient()
http_client.fetch("http://example.com",
callback=self.on_fetch)

def on_fetch(self, response):
do_something_with_response(response)
self.render("template.html")

而使用 tornado.gen 模块提供的 decorator ,在 Tornado 3.1 以前我们可以这样写异步代码:

1
2
3
4
5
6
7
8
class GenAsyncHandler(RequestHandler):
@asynchronous
@gen.engine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")

Tornado 3.1 及以上版本,可以直接使用 @gen.coroutine 来代替 @asynchronous:

1
2
3
4
5
6
7
class GenAsyncHandler(RequestHandler):
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")

注:@asynchronous 在 tornado.web 中定义,对于使用了 @gen.coroutine 装饰的方法不需要再使用 @asynchronous 进行装饰,但同时使用前述 2 个 decorator 进行方法装饰也是合法的,在同时使用的情况下需要注意的是 @asynchronous 必须是第 1 个 decorator 。

很显然,采用同步方式编写的异步代码相比起分散在各处的异步回调函数代码,更利于代码的阅读和逻辑的组织。

该模块的实现非常巧妙也不容易理解,作为阅读 Tonardo 源码的笔记,我将在后面内容中结合源码和自己的理解对其实现进行分析。

@gen.coroutine 与 @gen.engine 的实现原理

tornado.gen 支持以同步方式编写异步代码的核心就是 python generator。其原理简单来说,就是通过 generator.next() 启动 yield 返回的 generator ,通过 IOLoop 与 generator.send(value) 驱动 generator 运行,以达到协调异步执行的目的。

从功能上来看, @gen.coroutine 与 @gen.engine 的功能非常相似,差别就在于二者对被装饰方法参数中的 “callback” 参数处理不一样以及具有不同的返回值。 @gen.coroutine 装饰的方法执行后返回 Future 对象并且会将方法参数中的 “callback” 加入到 Future 完成后的回调列表中;@gen.engine 装饰的方法执行后没有返回值(注:实际上如果被装饰方法有返回值,会抛出 ReturnValueIgnoredError 异常,详见后面的代码分析部分)。

所以,通过 @gen.engine 装饰的方法没有返回值,方法必须自己在异步调用完成后调用 “callback” 来执行回调动作,而通过 @gen.coroutine 装饰的方法则可以直接返回执行结果,然后由 gen 模块负责将结果传递给 “callback” 来执行回调。

注: 从调用者的角度来看 @gen.coroutine 可以视为 @tornado.concurrent.return_future与 @gen.engine 的组合。

@gen.coroutine 实现原理

@gen.coroutine 中充分利用了 generator 的特性,下面是其实现代码及分析。

1
2
3
def coroutine(func, replace_callback=True):
"""Decorator for asynchronous generators."""
return _make_coroutine_wrapper(func, replace_callback=True)

coroutine 内部直接委托 _make_coroutine_wrapper 完成具体功能(这段代码中 coroutine 的可选参数 “replace_callback” 是没有使用的),返回一个 Future 实例对象。

_make_coroutine_wrapper(func, replace_callback) 函数作为 @gen.coroutine 和 @gen.engine 内部实现,通过 replace_callback 的值来决定是否对 “callback” 方法参数进行处理。coroutine 的实现中通过 replace_callback=True 调用 _make_coroutine_wrapper 函数,会检查方法参数中是否有 “callback” 参数,如果有的话会将其加入到方法返回值 Future 的完成后回调列表中。如下面代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def _make_coroutine_wrapper(func, replace_callback):
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = TracebackFuture()

# 处理 “callback”,忽略或者将其加入到 Future 的完成回调列表中。
if replace_callback and 'callback' in kwargs:
callback = kwargs.pop('callback')
IOLoop.current().add_future(
future, lambda future: callback(future.result()))

try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
# 在 python 2 以及 python 3.3 以前,generator 中不能直接通过
# return 返回值:return 被视为 raise StopIteration(),
# return <something> 被视为raise StopIteration(<something>)。
# 在 gen 模块中,特别定义了 Return 类型用于返回值:raise gen.Return(something>)
result = getattr(e, 'value', None)
except Exception:
# 发生异常,异常被写入 future(将会被设置为完成状态),结束调用,返回 future
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, types.GeneratorType):
# 通过检查 result 是否为 GeneratorType 来选择是否创建 coroutine ,对于
# 同步情况直接 future.set_result(result) 返回,避免创建 coroutine 而
# 造成的性能损失。
# 与 Tornado 4.0 之前的版本比较,这里已经把顶层 ExceptionStackContext
# 的构建以及 Runner.run 的功能进行了重构,都迁移到了 Runner 实现中。
#
# 通过 next 启动 generator ,启动前记录上下文,启动后对上下文进行一致性检查。
# 若 generator 中有从 "with StackContext" 直接 “yield” 的代码逻辑,将抛
# 出 StackContextInconsistentError 异常。
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result)
if stack_context._state.contexts is not orig_stack_contexts:
yielded = TracebackFuture()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
future.set_result(getattr(e, 'value', None))
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded)
try:
return future
finally:
# Subtle memory optimization: if next() raised an exception,
# the future's exc_info contains a traceback which
# includes this stack frame. This creates a cycle,
# which will be collected at the next full GC but has
# been shown to greatly increase memory usage of
# benchmarks (relative to the refcount-based scheme
# used in the absence of cycles). We can avoid the
# cycle by clearing the local variable after we return it.
#
# 代码注释中说,generator.next() 抛出异常失败后, future 的 exc_info
# 中会包含当前栈帧的引用,栈帧中也有对 future 的引用,这样导致一个环,必须
# 要在下一次 full GC 时才能回收内存。返回 future 后将 future 设置为 None
# 可以优化内存。(注:需要 full GC 是与 python 的垃圾回收实现采用引用计数
# 为主,标记-清除和分代机制为辅相关。python 采用引用计数来立刻释放可以释放
# 的内存,然后用标记-清除的方法来清除循环引用的不可达对象。)
future = None

# 同步情况下,不需要创建 coroutine,直接返回 future。
future.set_result(result)
return future
return wrapper

class Return(Exception):
def __init__(self, value=None):
super(Return, self).__init__()
self.value = value

注: 关于 CPython 的 GC 实现,这里有一篇不错的源码分析文章:Python垃圾回收机制

如下面的代码所示, IOLoop 的 add_future 方法会封装回调方法,在 Future 完成以后会将 “callback” 加入到 IOLoop 的回调列表中以等待 IOLoop 调度执行回调动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
def add_future(self, future, callback):
"""Schedules a callback on the ``IOLoop`` when the given
`.Future` is finished.

The callback is invoked with one argument, the
`.Future`.
"""
assert is_future(future)
callback = stack_context.wrap(callback)
# 在 future 的完成回调列表中增加一个 lambda 表达式,负责在
# 将 “callback” 加入 IOLoop 调度执行。
future.add_done_callback(
lambda future: self.add_callback(callback, future))

从上面的代码分析中可以看到 _make_coroutine_wrapper 函数已经完成了 coroutine 的创建,其代码逻辑比较简单,而整个 coroutine 启动、运行的核心功能被实现在 Runner 类中。 Runner 有一个 run() 方法,该方法负责启动 coroutine,并与 IOLoop 配合驱动 YieldPoint(注:在 generator 中通过 yield 返回的实例类型,Tornado 4.0 及以后推荐使用 Futures 类型, YieldPoint 类型被放弃) 执行直到 result_future 完成。 run() 方法的详细代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def run(self):
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future

# 当前 future 没有完成时直接返回,等待 IOLoop 在 future 完成后回调再执行
if not future.done():
return

# 当前 future 完成后对 coroutine 接下来运行没作用,立即释放
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
try:
value = future.result()
except Exception:
self.had_exception = True
yielded = self.gen.throw(*sys.exc_info())
else:
# 将 future 的结果赋值给当前 yield 表达式,驱动 generator 继续
# 执行, (如果generator未结束的话)返回下一个 yield 表达式结果
yielded = self.gen.send(value)
if stack_context._state.contexts is not orig_stack_contexts:
self.gen.throw(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
# generator 执行完成,将 执行结果赋值给 result_future,返回
self.finished = True
self.future = _null_future

# Tornado 4.0 之前使用 YieldPoint 驱动,Callback 与 Wait/WaitAll
# 协调时,Callback 的回调结果需要 Runner 作为中转站,通过
# Runner.register_callback(key) 登记 Callback ,再通过
# YieldPoint.result_callback(key) 取回“设置(回调)方法”,
# 外部通过“设置(回调)方法”把结果保存到 Runner.results 字典中。
# Wait/WaitAll 通过 get_result(key) 取回 结果。
# YieldFuture 的实现也采用了相同的实现方式。
# Tornado 4.0 之后使用 Future 代替 YieldPoint,这些已经过时。
# 与 Yield 相关的代码都是为了向后兼容。
if self.pending_callbacks and not self.had_exception:
# If we ran cleanly without waiting on all callbacks
# raise an error (really more of a warning). If we
# had an exception then some callbacks may have been
# orphaned, so skip the check in that case.
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
self.result_future.set_result(getattr(e, 'value', None))
self.result_future = None
self._deactivate_stack_context()
return
except Exception:
self.finished = True
self.future = _null_future
self.result_future.set_exc_info(sys.exc_info())
self.result_future = None
self._deactivate_stack_context()
return

# 继续处理 yield 表达式结果
if not self.handle_yield(yielded):
return
finally:
self.running = False

def handle_yield(self, yielded):
# 为了保持向后兼容,需要对多个 YieldPonit 和 Future 的混合集合做处理。
# 对于全是 Future 的集合类型使用新的 multi_future 函数进行封装处理;
# 不全是的使用 Multi 类进行封装,对于 Future 提供了 YieldFuture 适配器类。
# 详细的实现细节见 YieldFuture、Multi的实现代码。
# 若需要 run() 循环立即处理该 YieldPoint(被启动)/Future(已经完成) 则返
# 回 True,否则返回 False。
if isinstance(yielded, list):
if all(is_future(f) for f in yielded):
yielded = multi_future(yielded)
else:
yielded = Multi(yielded)
elif isinstance(yielded, dict):
if all(is_future(f) for f in yielded.values()):
yielded = multi_future(yielded)
else:
yielded = Multi(yielded)

# 针对第一个 YieldPoint 使用一个 ExceptionStackContext 上下文来处理
# StackContexts 中没有处理的异常,将未处理的异常记录到 result_future 中。
# 对于 Future 对象则没有必要, Future 提供了方法来记录异常和异常堆栈信息,
# 在 Future 完成后通过其 result() 方法获取结果(在 run 方法的调用)时会
# 再次抛出异常,这时可捕获记录到 result_future 中。
if isinstance(yielded, YieldPoint):
self.future = TracebackFuture()
def start_yield_point():
try:
yielded.start(self)
# 如果 yielded 已经完成,则将其结果赋值给 self.future,等待 run 循环处理;
# 若未就绪,则需要通过 Runner.set_result(key, value) 来进行赋值操作。
if yielded.is_ready():
self.future.set_result(
yielded.get_result())
else:
self.yield_point = yielded
except Exception:
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())
if self.stack_context_deactivate is None:
# Start a stack context if this is the first
# YieldPoint we've seen.
with stack_context.ExceptionStackContext(
self.handle_exception) as deactivate:
self.stack_context_deactivate = deactivate
def cb():
start_yield_point()
self.run()
# 第 1 个 yielded 交由 IOLoop来启动
self.io_loop.add_callback(cb)
return False
else:

相关文章: