生成器作为协程(无线程)
让FakeFtp 和retrbinary 函数使用回调,每次成功读取数据块时都会调用:
class FakeFtp(object):
def __init__(self):
self.data = iter(["aaa", "bbb", "ccc", "ddd"])
def login(self, user, password):
self.user = user
self.password = password
def retrbinary(self, cmd, cb):
for chunk in self.data:
cb(chunk)
使用简单的回调函数有个缺点,就是反复调用,回调
函数不能轻易地在调用之间保持上下文。
以下代码定义了process_chunks 生成器,它将能够接收数据块之一
通过一个并处理它们。与简单的回调相比,这里我们可以保留所有
在一个函数内处理而不丢失上下文。
from contextlib import closing
from itertools import count
def main():
processed = []
def process_chunks():
for i in count():
try:
# (repeatedly) get the chunk to process
chunk = yield
except GeneratorExit:
# finish_up
print("Finishing up.")
return
else:
# Here process the chunk as you like
print("inside coroutine, processing chunk:", i, chunk)
product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
processed.append(product)
with closing(process_chunks()) as coroutine:
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
print("processed result", processed)
print("DONE")
要查看实际代码,请放入FakeFtp 类,代码显示在上面和下面一行:
main()
放入一个文件并调用它:
$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE
工作原理
processed = [] 在这里只是为了说明,生成器process_chunks 应该没有问题
配合其外部环境。全部包裹成def main():证明,不需要
使用全局变量。
def process_chunks() 是解决方案的核心。它可能有一个镜头输入参数(不是
此处使用),但主要的一点是,它接收输入的每个 yield 行返回任何人发送的内容
通过.send(data) 进入这个生成器的实例。可以coroutine.send(chunk),但在此示例中,它是通过引用此函数callback.send 的回调来完成的。
注意,在实际解决方案中,代码中有多个yields 是没有问题的,它们是
一一处理。这可能用于例如读取(并忽略)CSV 文件的标题,然后
继续处理带有数据的记录。
我们可以按如下方式实例化和使用生成器:
coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()
真正的代码是使用contextlibclosing上下文管理器来确保coroutine.close()是
总是被调用。
结论
此解决方案不提供某种迭代器来以传统样式“从
外部”。另一方面,我们能够:
- “从内部”使用生成器
- 将所有迭代处理保留在一个函数中,而不会在回调之间中断
- 可选择使用外部上下文
- 向外部提供有用的结果
- 所有这些都可以在不使用线程的情况下完成
致谢:该解决方案深受 user2357112
所写的 SO answer
Python FTP “chunk” iterator (without loading entire file into memory)
的启发