【问题标题】:Can Python task scheduler Luigi detect indirect dependencies?Python 任务调度器 Luigi 可以检测到间接依赖吗?
【发布时间】:2017-07-16 21:12:09
【问题描述】:

短版:

在 Python 中是否有一个任务调度器可以做 gmake 所做的事情?特别是,我需要一个递归解决依赖关系的任务调度程序。我查看了 Luigi,但它似乎只解决了直接依赖关系。

加长版:

我正在尝试构建一个以预定义顺序处理大量数据文件的工作流,后面的任务可能直接依赖于一些早期任务的输出,但反过来,这些输出的正确性依赖于甚至更早的任务。

例如,让我们考虑如下依赖映射:

A

当我从任务 C 请求结果时,Luigi 会自动调度 B,然后由于 B 依赖于 A,它会调度 A。所以最终的运行顺序是 [A,B,C]。每个任务都会创建一个正式的输出文件作为成功执行的标志。这对于第一次运行来说很好。

现在,假设我在任务 A 的输入数据中犯了一个错误。显然,我需要重新运行整个链。但是,简单地从 A 中删除输出文件是行不通的。因为 Luigi 看到 B 和 C 的输出,得出结论任务 C 的要求已经满足,不需要运行。我必须从依赖于 A 的任务中删除 ALL 的输出文件,以便它们再次运行。在简单的情况下,我必须删除 A、B 和 C 中的所有输出文件,以便 Luigi 检测到对 A 所做的更改。

这是一个非常不方便的功能。如果我有数十或数百个任务相互之间具有相当复杂的依赖关系,那么当其中一项任务需要重新运行时,真的很难判断哪些任务会受到影响。对于任务调度程序并具有解决依赖关系的能力,我希望 Luigi 能够像 GNU-Make 一样行事,其中递归检查依赖关系,并且当最深的源文件之一发生更改时,将重建最终目标。

我想知道是否有人可以就这个问题提供一些建议。我是否缺少 Luigi 中的一些关键功能?是否有其他任务调度程序可以充当 gmake?我对基于 Python 的包特别感兴趣,并且更喜欢那些支持 Windows 的包。

非常感谢!

【问题讨论】:

    标签: python dependencies scheduled-tasks luigi


    【解决方案1】:

    似乎可以通过覆盖您的任务的完整方法。您必须在依赖关系图中一直应用它。

    def complete(self):
        outputs = self.flatten(self.output())
        if not all(map(lambda output: output.exists(), outputs)):
            return False
        for task in self.flatten(self.requires()):
            if not task.complete():
                for output in outputs:
                    if output.exists():
                        output.remove()
                return False
        return True
    

    【讨论】:

    • 嗨@MattMcKnight,非常感谢您指出“完整”方法!我会试试这个解决方案。但是,我想知道为什么 Luigi 默认不这样做?使用 Python for 循环手动解决依赖关系可能效率不高,像我这样的普通用户可能会犯错误和/或做事欠佳。但无论如何,非常感谢! :)
    • 上述代码需要进行多次更正才能正常工作。首先,方法flatten是Luigi定义的,所以需要调用为self.flatten。其次,我发现最好在最后重用默认的complete方法来检查任务本身的状态,所以我写了return super(MyTask, self).complete(),假设继承的类名为MyTask。但是,在运行下游任务时,我仍然收到“文件已存在”错误,因为它们的输出文件没有被删除。知道如何自动更新这些输出文件吗?
    • 很抱歉没有尝试代码,只是想大致了解一种方法。我想除了从完整返回False 之外,还必须删除现有的输出。 Task.complete() 方法没有那么复杂。 github.com/spotify/luigi/blob/master/luigi/task.py#L526
    • 有一些类似清理类型进程的请求。 github.com/spotify/luigi/issues/595#issuecomment-194323344
    • 嗨@MattMcKnight,感谢您的更新!我发现 Luigi LocalTarget 有一个“删除”方法,可用于删除文件。因此,我将调用 output.remove(),而不是使用 os.remove 方法。我会接受这个答案,并发布我的最终测试版本。非常感谢您的帮助! :)
    【解决方案2】:

    确实这很不方便,d6tflow 检查所有上游依赖项的完整性,而不仅仅是 TaskC 的输出。如果你重置TaskA,TaskC也会不完整,自动重新运行。

    # reset TaskA => makes TaskC incomplete
    TaskA().invalidate() 
    d6tflow.preview(TaskC()) # all tasks pending
    

    有关详细信息,请参阅下面的完整示例和d6tflow docs

    import d6tflow
    import pandas as pd
    
    class TaskA(d6tflow.tasks.TaskCachePandas):  # save dataframe in memory
    
        def run(self):        
            self.save(pd.DataFrame({'a':range(10)})) # quickly save dataframe
    
    class TaskB(d6tflow.tasks.TaskCachePandas):
    
        def requires(self):
            return TaskA() # define dependency
    
        def run(self):
            df = self.input().load() # quickly load required data
            df = df*2
            self.save(df)
    
    class TaskC(d6tflow.tasks.TaskCachePandas):
    
        def requires(self):
            return TaskB()
    
        def run(self):
            df = self.input().load() 
            df = df*2
            self.save(df)
    
    # Check task dependencies and their execution status
    d6tflow.preview(TaskC())
    '''
    └─--[TaskC-{} (PENDING)]
       └─--[TaskB-{} (PENDING)]
          └─--[TaskA-{} (PENDING)]
    '''
    
    # Execute the model training task including dependencies
    d6tflow.run(TaskC())
    
    '''
    ===== Luigi Execution Summary =====
    
    Scheduled 3 tasks of which:
    * 3 ran successfully:
        - 1 TaskA()
        - 1 TaskB()
        - 1 TaskC()
    '''
    
    # all tasks complete
    d6tflow.preview(TaskC())
    
    '''
    └─--[TaskC-{} (COMPLETE)]
       └─--[TaskB-{} (COMPLETE)]
          └─--[TaskA-{} (COMPLETE)]
    '''
    
    # reset TaskA => makes TaskC incomplete
    TaskA().invalidate() 
    d6tflow.preview(TaskC())
    '''
    └─--[TaskC-{} (PENDING)]
       └─--[TaskB-{} (PENDING)]
          └─--[TaskA-{} (PENDING)]
    '''
    

    【讨论】:

    • 您能否简要评论一下该库用于解决递归依赖关系的算法的效率?这个库可以在没有明显/明显延迟的情况下处理的任务池的典型大小是多少?
    • 不确定,这在实践中从来都不是问题,我已经用它构建了大规模的机器学习系统。工作发生在任务内部,图书馆只是协调。一位用户训练了 20K ML 模型。
    猜你喜欢
    • 2015-05-01
    • 1970-01-01
    • 2015-06-24
    • 1970-01-01
    • 2020-05-10
    • 1970-01-01
    • 1970-01-01
    • 2016-04-28
    • 1970-01-01
    相关资源
    最近更新 更多