【问题标题】:Google AppEngine Pipelines API谷歌 AppEngine 管道 API
【发布时间】:2012-07-06 15:26:31
【问题描述】:

我想将我的一些任务重写为管道。主要是因为我需要一种方法来检测任务何时完成或按特定顺序启动任务。我的问题是我不确定如何将递归任务重写为管道。我所说的递归是指这样称呼自己的任务:

class MyTask(webapp.RequestHandler):
    def post(self):
        cursor = self.request.get('cursor', None)

        [set cursor if not null]
        [fetch 100 entities form datastore]

        if len(result) >= 100:
            [ create the same task in the queue and pass the cursor ]

        [do actual work the task was created for]

现在我真的很想把它写成一个管道并做一些类似的事情:

class DoSomeJob(pipeline.Pipeline):

   def run(self):
       with pipeline.InOrder():
          yield MyTask()
          yield MyOtherTask()
          yield DoSomeMoreWork(message2)

对此的任何帮助将不胜感激。谢谢!

【问题讨论】:

  • 你是用管道 API 解决的还是使用了其他方法?

标签: google-app-engine


【解决方案1】:

基本管道只返回一个值:

class MyFirstPipeline(pipeline.Pipeline):
    def run(self):
        return "Hello World"  

该值必须是 JSON 可序列化的。

如果您需要协调多个管道,则需要使用 生成器管道yield 语句。

class MyGeneratorPipeline(pipeline.Pipeline):
    def run(self):
        yield MyFirstPipeline()

您可以将管道的产生视为返回'future'

您可以将此未来作为输入参数传递给另一个管道:

class MyGeneratorPipeline(pipeline.Pipeline):
    def run(self):
        result = yield MyFirstPipeline()
        yield MyOtherPipeline(result)

Pipeline API 将确保 MyOtherPipelinerun 方法仅在来自 MyFirstPipelineresult 未来被解析为实际值时才被调用。

您不能在同一方法中混合使用 yieldreturn。如果您使用 yield,则该值必须是 Pipeline 实例。如果您想这样做,这可能会导致问题:

class MyRootPipeline(pipeline.Pipeline):
    def run(self, *input_args):
        results = []
        for input_arg in input_args:
            intermediate = yield MyFirstPipeline(input_arg)
            result = yield MyOtherPipeline(intermediate)
            results.append(result)
        yield results

在这种情况下,Pipeline API 只会在您的最后 yield results 行中看到一个 列表,因此它不知道在返回之前解决其中的期货,您会收到错误。

它们没有记录,但包含一个实用管道库,可以在这里提供帮助:
https://code.google.com/p/appengine-pipeline/source/browse/trunk/src/pipeline/common.py

所以上面的一个实际工作的版本看起来像:

import pipeline
from pipeline import common

class MyRootPipeline(pipeline.Pipeline):
    def run(self, *input_args):
        results = []
        for input_arg in input_args:
            intermediate = yield MyFirstPipeline(input_arg)
            result = yield MyOtherPipeline(intermediate)
            results.append(result)
        yield common.List(*results)

现在我们没事了,我们正在生成一个管道实例,并且 Pipeline API 知道要正确解析它的未来值。 common.List管道的来源很简单:

class List(pipeline.Pipeline):
    """Returns a list with the supplied positional arguments."""

    def run(self, *args):
        return list(args)

...在调用 this 管道的 run 方法时,管道 API 已将列表中的所有项目解析为实际值,可以作为 *args 传入.

无论如何,回到你原来的例子,你可以这样做:

class FetchEntitites(pipeline.Pipeline):
    def run(self, cursor=None)
        if cursor is not None:
            cursor = Cursor(urlsafe=cursor)

        # I think it's ok to pass None as the cursor here, haven't confirmed
        results, next_curs, more = MyModel.query().fetch_page(100,
                                                              start_cursor=cursor)

        # queue up a task for the next page of results immediately
        future_results = []
        if more:
            future_results = yield FetchEntitites(next_curs.urlsafe())

        current_results = [ do some work on `results` ]

        # (assumes current_results and future_results are both lists)
        # this will have to wait for all of the recursive calls in
        # future_results to resolve before it can resolve itself:
        yield common.Extend(current_results, future_results)

进一步说明

一开始我说过我们可以将result = yield MyPipeline() 视为返回“未来”。严格来说这不是真的,显然我们实际上只是在产生实例化的管道。 (不用说我们的 run 方法现在是一个 generator 函数。)

Python 的 yield expressions 工作方式的奇怪之处在于,尽管看起来如此,yield 的值却在函数之外的某个地方(到Pipeline API 设备)而不是进入你的result var。表达式左侧的result var 的值也通过在生成器上调用send(生成器是您定义的run 方法)从函数外部 推入)。

因此,通过产生一个实例化的 Pipeline,您可以让 Pipeline API 获取该实例并在其他时间在其他地方调用其 run 方法(实际上它将作为类名和集合传递到任务队列args 和 kwargs 并在那里重新实例化...这就是为什么您的 args 和 kwargs 也需要 JSON 可序列化的原因。

同时,Pipeline API sends 是一个 PipelineFuture 对象到您的 run 生成器中,这就是您的 result var 中出现的内容。这似乎有点神奇和违反直觉,但这就是带有 yield 表达式的生成器的工作方式。

我费了一番脑筋才把它弄到这个水平,欢迎对我的错误进行任何澄清或更正。

【讨论】:

  • 超级解说。 Pipeline API 很棒,但文档很糟糕。这是一个关于如何构建复杂管道的非常有用的示例,您事先不知道可能返回多少(或何时)结果,即必须递归生成任务
  • 如果您在项目中认真使用 Pipeline API,请记住 Google 似乎不支持 Python 版本,只有 Java 端在问题和提交方面有任何活动.状态用户界面也基本上完全坏了。我一直在使用这个分支,它解决了一些问题,至少看起来正在积极开发:github.com/Khan/appengine-mapreduce
  • 当您实例化管道p = MyPipeline(some_future) 时,您可以将future 作为arg 传递,但在管道的run 方法中,arg 将被解析为真实结果......所以输入您的电子邮件代码进入管道的run 方法,该管道接收之前所有内容的结果
  • 在上面的例子中,这意味着要么子类化common.Extend,要么你可以在最后做一个专门的额外管道步骤,例如final_result = common.Extend(future_results)yield SendEmailPipeline(final_result)
  • 看我的回复粘贴paste.openstack.org/show/478550(写在我的头顶,未经测试!)基本上你需要有另一个“层”管道来包装递归部分并只收集最终结果
【解决方案2】:

当您创建管道时,它会返回一个表示“阶段”的对象。您可以向舞台询问其 ID,然后将其保存。稍后,您可以根据保存的 id 重新构建阶段,然后询问阶段是否已完成。

查看http://code.google.com/p/appengine-pipeline/wiki/GettingStarted 并查找has_finalized。有一个示例可以满足您的大部分需求。

【讨论】:

  • 如果他们对递归任务和完成后收集数据的方式做更多的文档会非常有帮助。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-01-10
  • 2018-05-13
  • 2021-03-15
  • 2022-11-26
  • 2012-01-24
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多