【问题标题】:Can luigi rerun tasks when the task dependencies become out of date?当任务依赖关系过期时,luigi 可以重新运行任务吗?
【发布时间】:2015-05-01 20:42:31
【问题描述】:

据我所知,luigi.Target 可以存在,也可以不存在。 因此,如果存在luigi.Target,则不会重新计算。

我正在寻找一种方法来强制重新计算任务,如果它的一个依赖项被修改,或者如果其中一个任务的代码发生变化。

【问题讨论】:

    标签: python luigi


    【解决方案1】:

    实现目标的一种方法是重写 complete(...) 方法。

    The documentation for complete is straightforward.

    只需实现一个函数来检查您的约束,如果您想重新计算任务,则返回 False

    例如,要在更新依赖项时强制重新计算,您可以这样做:

    def complete(self):
        """Flag this task as incomplete if any requirement is incomplete or has been updated more recently than this task"""
        import os
        import time
    
        def mtime(path):
            return time.ctime(os.path.getmtime(path))
    
        # assuming 1 output
        if not os.path.exists(self.output().path):
            return False
    
        self_mtime = mtime(self.output().path) 
    
        # the below assumes a list of requirements, each with a list of outputs. YMMV
        for el in self.requires():
            if not el.complete():
                return False
            for output in el.output():
                if mtime(output.path) > self_mtime:
                    return False
    
        return True
    

    当任何需求不完整或任何需求的修改时间比当前任务更新或当前任务的输出不存在时,这将返回 False

    检测代码何时更改更难。您可以使用类似的方案(检查mtime),但除非每个任务都有自己的文件,否则它会成败。

    由于能够覆盖complete,因此可以实现您想要重新计算的任何逻辑。如果您希望为许多任务使用特定的 complete 方法,我建议将 luigi.Task 子类化,在那里实现您的自定义 complete,然后从子类继承您的任务。

    【讨论】:

    • 第一次执行任务后,complete() 将运行,结果为 True。如果依赖项发生变化,luigi 怎么知道要重新运行 complete()?
    • @selotape complete() 每次调用任务时都会调用。这就是为什么文档建议不要在其中进行任何实际工作并且它是确定性的。为确保每当任务依赖任务发生更改时调用它,修改它们的 complete() 以检查依赖关系树是必要的。
    • 我认为您应该使用output.exists() 之类的东西而不是手动检查路径,因为目标类可能带有exists 的自定义定义。
    【解决方案2】:

    我来晚了,但这里有一个 mixin,它改进了接受的答案以支持多个输入/输出文件。

    class MTimeMixin:
        """
            Mixin that flags a task as incomplete if any requirement
            is incomplete or has been updated more recently than this task
            This is based on http://stackoverflow.com/a/29304506, but extends
            it to support multiple input / output dependencies.
        """
    
        def complete(self):
            def to_list(obj):
                if type(obj) in (type(()), type([])):
                    return obj
                else:
                    return [obj]
    
            def mtime(path):
                return time.ctime(os.path.getmtime(path))
    
            if not all(os.path.exists(out.path) for out in to_list(self.output())):
                return False
    
            self_mtime = min(mtime(out.path) for out in to_list(self.output()))
    
            # the below assumes a list of requirements, each with a list of outputs. YMMV
            for el in to_list(self.requires()):
                if not el.complete():
                    return False
                for output in to_list(el.output()):
                    if mtime(output.path) > self_mtime:
                        return False
    
            return True
    

    要使用它,您只需使用例如class MyTask(Mixin, luigi.Task) 声明您的类。

    【讨论】:

    • 我喜欢你的 mixin,但 Luigi 抱怨目标文件已经存在。评论不允许我有足够的空间来发布示例,所以我会将其作为非答案答案放在下面。
    【解决方案3】:

    上面的代码对我很有效,除了我相信正确的时间戳比较mtime(path)必须返回一个浮点数而不是一个字符串(“Sat”>“Mon”...[原文如此])。因此,简单地说,

    def mtime(path):
        return os.path.getmtime(path)
    

    代替:

    def mtime(path):
        return time.ctime(os.path.getmtime(path))
    

    【讨论】:

      【解决方案4】:

      关于 Shilad Sen 在下面发布的 Mixin 建议,请考虑以下示例:

      # Filename: run_luigi.py
      import luigi
      from MTimeMixin import MTimeMixin
      
      class PrintNumbers(luigi.Task):
      
          def requires(self):
              wreturn []
      
          def output(self):
              return luigi.LocalTarget("numbers_up_to_10.txt")
      
          def run(self):
              with self.output().open('w') as f:
                  for i in range(1, 11):
                      f.write("{}\n".format(i))
      
      class SquaredNumbers(MTimeMixin, luigi.Task):
      
          def requires(self):
              return [PrintNumbers()]
      
          def output(self):
              return luigi.LocalTarget("squares.txt")
      
          def run(self):
              with self.input()[0].open() as fin, self.output().open('w') as fout:
                  for line in fin:
                      n = int(line.strip())
                      out = n * n
                      fout.write("{}:{}\n".format(n, out))
      
      if __name__ == '__main__':
          luigi.run()
      

      MTimeMixin 在上面的帖子中。我使用一次运行任务

      luigi --module run_luigi SquaredNumbers
      

      然后我触摸文件 numbers_up_to_10.txt 并再次运行任务。然后 Luigi 提出以下抱怨:

        File "c:\winpython-64bit-3.4.4.6qt5\python-3.4.4.amd64\lib\site-packages\luigi-2.7.1-py3.4.egg\luigi\local_target.py", line 40, in move_to_final_destination
          os.rename(self.tmp_path, self.path)
      FileExistsError: [WinError 183] Cannot create a file when that file already exists: 'squares.txt-luigi-tmp-5391104487' -> 'squares.txt'
      

      这可能只是一个 Windows 问题,而不是 Linux 上的问题,“mv a b”可能只是删除旧的 b,如果它已经存在并且没有写保护。我们可以通过 Luigi/local_target.py 的以下补丁来解决这个问题:

      def move_to_final_destination(self):
          if os.path.exists(self.path):
              os.rename(self.path, self.path + time.strftime("_%Y%m%d%H%M%S.txt"))
          os.rename(self.tmp_path, self.path)
      

      为了完整起见,这里再次将 Mixin 作为一个单独的文件,来自另一篇文章:

      import os
      
      class MTimeMixin:
          """
              Mixin that flags a task as incomplete if any requirement
              is incomplete or has been updated more recently than this task
              This is based on http://stackoverflow.com/a/29304506, but extends
              it to support multiple input / output dependencies.
          """
      
          def complete(self):
              def to_list(obj):
                  if type(obj) in (type(()), type([])):
                      return obj
                  else:
                      return [obj]
      
              def mtime(path):
                  return os.path.getmtime(path)
      
              if not all(os.path.exists(out.path) for out in to_list(self.output())):
                  return False
      
              self_mtime = min(mtime(out.path) for out in to_list(self.output()))
      
              # the below assumes a list of requirements, each with a list of outputs. YMMV
              for el in to_list(self.requires()):
                  if not el.complete():
                      return False
                  for output in to_list(el.output()):
                      if mtime(output.path) > self_mtime:
                          return False
      
              return True
      

      【讨论】:

      • 感谢您的建议!我认为您对 Windows 问题的看法是正确的。我经常使用它,没有文件破坏问题。对于它的价值,这种整体方法让我有点不安。我总是觉得我在扭曲 Luigi,它不打算被使用。
      猜你喜欢
      • 1970-01-01
      • 2017-07-16
      • 1970-01-01
      • 1970-01-01
      • 2020-01-02
      • 2020-05-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多