【问题标题】:Looping tasks in PrefectPrefect 中的循环任务
【发布时间】:2021-07-27 00:17:54
【问题描述】:

我想一次又一次地循环任务,直到达到某个条件,然后再继续工作流程的其余部分。

到目前为止,我所拥有的是:

# Loop task
class MyLoop(Task):
    def run(self):
        loop_res = prefect.context.get("task_loop_result", 1)
        print (loop_res)
        if loop_res >= 10:
            return loop_res
        raise LOOP(result=loop_res+1)

但据我了解,这不适用于多项任务。 有没有办法进一步返回并一次循环执行多个任务?

【问题讨论】:

    标签: python workflow prefect


    【解决方案1】:

    解决方案是简单地创建一个任务,该任务本身创建一个具有一个或多个参数的新流并调用 flow.run()。例如:

    class MultipleTaskLoop(Task):
        def run(self):
            # Get previous value
            loop_res = prefect.context.get("task_loop_result", 1)
            
            # Create subflow
            with Flow('Subflow', executor=LocalDaskExecutor()) as flow:
                x = Parameter('x', default = 1)
                loop1 = print_loop()
                add = add_value(x)
                loop2 = print_loop()
                loop1.set_downstream(add)
                add.set_downstream(loop2)
    
            # Run subflow and extract result
            subflow_res = flow.run(parameters={'x': loop_res})
            new_res = subflow_res.result[add]._result.value
    
            # Loop
            if new_res >= 10:
                return new_res
            raise LOOP(result=new_res)
    

    print_loop 只是在输出中打印“循环”,add_value 将其接收到的值加一。

    【讨论】:

    • 这很聪明。我从来没有想过这样做。我很确定如果我尝试过有人会拒绝我的 MR,哈哈,但这是做 OP 想要做的事的聪明方法。
    • 嗯,它是由他们的 Slack 上的一位 Prefect 开发者推荐给我的,所以即使它可能不是我认为可以容忍的最优雅的方式 :)
    【解决方案2】:

    除非我遗漏了什么,否则答案是否定的。

    完美的流程是DAGs,你所描述的(一次又一次地循环多个任务直到满足某个条件)会形成一个循环,所以你不能这样做。

    这可能有帮助,也可能没有帮助,但是您可以尝试将您想要循环的所有任务放入一个任务中,并在该任务中循环,直到满足您的退出条件。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-01
      • 2011-05-28
      • 1970-01-01
      • 2014-04-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多