【问题标题】:Use same Airflow task in multiple branch在多个分支中使用相同的 Airflow 任务
【发布时间】:2022-11-22 06:54:09
【问题描述】:

有没有办法我可以重新使用需要在每个分支执行中执行的气流任务。

例如。我在每个 task_1 中都有以下任务,task_2 需要在第一个流程中运行,task_3 在第二个流程中运行,但在这两种情况下都需要运行 task_comm。我如何创建 1 个任务并在两个流程中调用它?

flow_1 = DummyOperator(task_id = 'flow_1')
task_1 = DummyOperator(task_id = 'task_1')
task_2 = DummyOperator(task_id = 'task_2')

flow_2 = DummyOperator(task_id = 'flow_2')
task_3 = DummyOperator(task_id = 'task_3')

task_comm = DummyOperator(task_id = 'task_comm')

branch >> flow_1 >> task_1 >> task2 >> task_comm
branch >> flow_2 >> task_3 >> task_comm

【问题讨论】:

    标签: python airflow task


    【解决方案1】:

    在您的代码中,您有两个不同的分支,其中一个将成功,第二个将被跳过。要在其中任何一个之后运行task_comm,您只需要更新其触发规则:

    from airflow.utils.trigger_rule import TriggerRule
    task_comm = DummyOperator(task_id = 'task_comm', trigger_rule=TriggerRule.NONE_FAILED)
    

    在这种情况下,task_comm 将在任务 [task2, task_3] 具有状态之一 [succeeded, skipped] 时执行。在您的情况下,其中一个将具有succeeded,第二个将具有skipped,这足以触发任务。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-08-19
      • 2016-07-19
      • 2020-06-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多