【问题标题】:Release `resource` in a Luigi pipeline在 Luigi 管道中释放“资源”
【发布时间】:2017-03-08 16:33:38
【问题描述】:

我有一个 Luigi 管道,其中包含我批量运行的任务图。其中一些任务依赖于昂贵的资源(例如 AWS EC2 机器集群或其他昂贵的资源)。

我正在尝试以一种聪明的方式使用这个resource,以便我在运行任务之前acquire它,并在所有任务完成后立即release它。一般来说,昂贵的资源在管道的开始分配,依赖图的中途可以很好地释放。

有没有一种有效的方法在 Luigi 中对此进行建模,以实现资源的 aquirerelease

根据AquireRelease luigi.Tasks 对其进行建模并不是最优的,因为它给我的图表增加了很多复杂性和不必要的边。理想情况下,scheduler 会检查它的state,当没有更多需要资源的RUNNINGPENDING 任务时,它可以release 它。

这是否已经存在,或者我必须自己将此功能添加到 Luigi?

【问题讨论】:

    标签: python pipeline luigi


    【解决方案1】:

    Luigi 支持资源的概念,即在任务被调度并运行时将占用它,然后在任务完成时释放。为什么需要更高级的方法。

    来自doc

    此部分可以包含任意键。这些中的每一个都指定了调度程序可以允许工作人员使用的全局资源的数量。调度程序将防止运行具有指定资源的作业超过本节中的计数。假定未指定的资源具有限制 1。具有 2 个 hive 资源和 1 个 mysql 资源的配置的示例资源部分:

    [resources]
    hive: 2
    mysql: 1
    

    在您的情况下,您可以通过使用 aws : 1 来对此进行建模。然后,该资源一次只能使用一次。现在,如果您想要控制任务的安排方式,那么您可以尝试使用优先级。

    然后您可以在任务中添加:

    resources = {"aws": 1}
    

    据我所知,没有办法直接获取资源,因为这是由任务执行控制的。

    已更新(来自评论):

    例如,如果您想要执行某些事情(即清理),您可能需要为此使用事件处理程序。

     @luigi.Task.event_handler(luigi.Event.FAILURE)
     def handler_failure(self, exception):
           # do cleanups
    

    查看完整的documentation 了解更多信息。

    【讨论】:

    • 感谢您的回答,@mfcabrera。我已经在我的管道中使用了这个 Luigi 资源的概念。然而,这仅在给定资源耗尽时调度程序不启动许多任务时才有用。假设 EC2 一次只能处理 1 个 Luigi 任务,而您有 4 个工作人员,那么这很有用。然而,它与实际资源的实际获取和释放无关 - 例如 EC2。在您的示例中,您将如何实现实际 AWS 资源(例如任务 StartCluster 和 StopCluster)的启动和停止,尤其是在失败和禁用任务的情况下?
    • 具体来说,在我当前的管道中,StopCluster 是 DAG 中的最后一个任务。因此,DAG 中间的任何故障和上游禁用都会导致 StopCluster 不被调用,这意味着我无法释放 EC2 实例以防出错。
    • @AntonEvangelatov 您尝试过事件处理程序吗?我会更新答案。也许这有帮助。
    • 还没有。不清楚是在每次失败后触发此回调,还是最终在任务失败重试次数后触发。目前 StopCluster 被建模为所有可能失败的任务的包装器。如果我可以动态更新 StopCluster 的依赖项(例如,如果失败 3 次,则删除失败的任务)可能会解决它。
    猜你喜欢
    • 2015-05-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-02-15
    • 1970-01-01
    • 1970-01-01
    • 2012-01-08
    • 2012-05-04
    相关资源
    最近更新 更多