基本管道只返回一个值:
class MyFirstPipeline(pipeline.Pipeline):
def run(self):
return "Hello World"
该值必须是 JSON 可序列化的。
如果您需要协调多个管道,则需要使用 生成器管道 和 yield 语句。
class MyGeneratorPipeline(pipeline.Pipeline):
def run(self):
yield MyFirstPipeline()
您可以将管道的产生视为返回'future'。
您可以将此未来作为输入参数传递给另一个管道:
class MyGeneratorPipeline(pipeline.Pipeline):
def run(self):
result = yield MyFirstPipeline()
yield MyOtherPipeline(result)
Pipeline API 将确保 MyOtherPipeline 的 run 方法仅在来自 MyFirstPipeline 的 result 未来被解析为实际值时才被调用。
您不能在同一方法中混合使用 yield 和 return。如果您使用 yield,则该值必须是 Pipeline 实例。如果您想这样做,这可能会导致问题:
class MyRootPipeline(pipeline.Pipeline):
def run(self, *input_args):
results = []
for input_arg in input_args:
intermediate = yield MyFirstPipeline(input_arg)
result = yield MyOtherPipeline(intermediate)
results.append(result)
yield results
在这种情况下,Pipeline API 只会在您的最后 yield results 行中看到一个 列表,因此它不知道在返回之前解决其中的期货,您会收到错误。
它们没有记录,但包含一个实用管道库,可以在这里提供帮助:
https://code.google.com/p/appengine-pipeline/source/browse/trunk/src/pipeline/common.py
所以上面的一个实际工作的版本看起来像:
import pipeline
from pipeline import common
class MyRootPipeline(pipeline.Pipeline):
def run(self, *input_args):
results = []
for input_arg in input_args:
intermediate = yield MyFirstPipeline(input_arg)
result = yield MyOtherPipeline(intermediate)
results.append(result)
yield common.List(*results)
现在我们没事了,我们正在生成一个管道实例,并且 Pipeline API 知道要正确解析它的未来值。 common.List管道的来源很简单:
class List(pipeline.Pipeline):
"""Returns a list with the supplied positional arguments."""
def run(self, *args):
return list(args)
...在调用 this 管道的 run 方法时,管道 API 已将列表中的所有项目解析为实际值,可以作为 *args 传入.
无论如何,回到你原来的例子,你可以这样做:
class FetchEntitites(pipeline.Pipeline):
def run(self, cursor=None)
if cursor is not None:
cursor = Cursor(urlsafe=cursor)
# I think it's ok to pass None as the cursor here, haven't confirmed
results, next_curs, more = MyModel.query().fetch_page(100,
start_cursor=cursor)
# queue up a task for the next page of results immediately
future_results = []
if more:
future_results = yield FetchEntitites(next_curs.urlsafe())
current_results = [ do some work on `results` ]
# (assumes current_results and future_results are both lists)
# this will have to wait for all of the recursive calls in
# future_results to resolve before it can resolve itself:
yield common.Extend(current_results, future_results)
进一步说明
一开始我说过我们可以将result = yield MyPipeline() 视为返回“未来”。严格来说这不是真的,显然我们实际上只是在产生实例化的管道。 (不用说我们的 run 方法现在是一个 generator 函数。)
Python 的 yield expressions 工作方式的奇怪之处在于,尽管看起来如此,yield 的值却在函数之外的某个地方(到Pipeline API 设备)而不是进入你的result var。表达式左侧的result var 的值也通过在生成器上调用send(生成器是您定义的run 方法)从函数外部 推入)。
因此,通过产生一个实例化的 Pipeline,您可以让 Pipeline API 获取该实例并在其他时间在其他地方调用其 run 方法(实际上它将作为类名和集合传递到任务队列args 和 kwargs 并在那里重新实例化...这就是为什么您的 args 和 kwargs 也需要 JSON 可序列化的原因。
同时,Pipeline API sends 是一个 PipelineFuture 对象到您的 run 生成器中,这就是您的 result var 中出现的内容。这似乎有点神奇和违反直觉,但这就是带有 yield 表达式的生成器的工作方式。
我费了一番脑筋才把它弄到这个水平,欢迎对我的错误进行任何澄清或更正。