【问题标题】:Luigi - Executing 2 pipeline jobs, (Must be in SYNC, not paralell)Luigi - 执行 2 个管道作业,(必须是同步的,不是并行的)
【发布时间】:2017-04-04 17:40:56
【问题描述】:

我正在开发 Luigi 框架,我想在一个类中执行 2 个作业(两者都是管道作业),但是当 Job1 完全执行时,Job2 必须只运行。

class ExecuteTwoJobs(luigi.Task):
    def requires(self):
        reqs = []
        reqs.append(Job1(*args, **kwargs))
        reqs.append(Job2(*args, **kwargs))
        return reqs

    def output(self):
        //statements

    def run(self):
        //statements

有什么办法可以执行job1,一旦完成,就去执行Job2。

非常感谢任何帮助

【问题讨论】:

    标签: python-2.7 luigi


    【解决方案1】:

    这样做的方法是运行Job2,Job2需要Job1。

    class Job2(luigi.Task):
    
        def requires(self):
            yield Job1(*args, **kwargs))
    
        def output(self):
            //statements
    
        def run(self):
            //statements
    

    然后你这样运行 Job2:

    luigi --module <your_module> Job2 <tasks params>
    

    luigi会先运行Job1,完成后会运行Job2。

    【讨论】:

    • 感谢您提供解决方案,是的,我们实际上必须按照您的方式进行操作,但我想要的是在单个类中执行它们并且无法使 Job2 需要 Job1。假设 Job3 需要 Job1 和 Job2,所以在这种情况下,一旦 Job3 被执行: Job1 和 Job2 是流水线的,有什么办法:我可以让 Job1 在 Job2 开始之前执行。提前致谢
    • @TalatParwez 我认为您不了解 Luigi 的工作原理,或者我无法理解您。运行我作为解决方案发布的任务的方式是luigi --module your_module Job2 &lt;tasks params&gt;:这样 Luigi 将检查 Job1 是否已经运行,如果没有,它将执行它。 Job1 完成后,将运行 Job2。请注意,您从未告诉 Luigi 运行 Job1(从命令行),您只是要求它运行 Job2。
    【解决方案2】:

    一般情况下建议使用@matagus 的答案,但如果您因某种特殊原因无法使Job2 要求Job1,则可以使用dynamic dependency,如下所示。

    class ExecuteTwoJobs(luigi.ExternalTask):
    
        def output(self):
            //statements
    
        def run(self):
            yield Job1(*args, **kwargs)
            yield Job2(*args, **kwargs)
    

    当这个任务执行时,Job1如果没有完成就执行,然后Job2运行。不过,我们从 DAG 工作流管理中获得的好处会更少。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-08-01
      • 1970-01-01
      • 2017-03-25
      • 2017-05-10
      相关资源
      最近更新 更多