【问题标题】:Wait for all the downstream tasks from the previous run in Apache Airflow等待 Apache Airflow 中上次运行的所有下游任务
【发布时间】:2020-10-16 14:32:23
【问题描述】:

我正在运行多个实例。所以, max_active_runs=1 不是我的解决方案。 wait_for_downstream 只能等待下游任务。

来自baseoperator.py代码,

wait_for_downstream:设置为 true 时,任务实例 X 将立即等待前一个实例下游的任务 任务 X 在运行之前成功完成。如果 任务 X 的不同实例会更改相同的资产,并且该资产 由任务 X 下游的任务使用。请注意,depends_on_past 在使用 wait_for_downstream 的任何地方都强制为 True。另请注意 仅等待前一个任务实例下游的立即任务 为了;任何下游任务的状态都会被忽略。

如何等待所有下游任务?

我的 DAG 中有多个并行步骤。

编辑:

我按照 Relic16 的建议尝试了 ExternalTask​​Sensor 以等待 C1/C2/C3 .. 完成。它在回填期间创造了竞争条件。尽管depends_on_past 为True,但同时运行多个实例。一旦第一次运行的 ExternalTask​​Sensor 成功并且处理了实际的下一个任务(A1/A2/A3..),它就会在第二次运行中触发 ExternalTask​​Sensor 并消耗所有插槽(因为有超过 16 个并行步骤)并继续等待 C1/C2/C3 .. 完成。

【问题讨论】:

    标签: airflow


    【解决方案1】:

    假设 B、C 和 D 是 A 的下游任务。 我可以创建一个虚拟任务 E,它是 B、C 和 D 的下游任务。

    现在我在 A 中设置了一个 ExternalTask​​Sensor,它等待任务 E 的先前执行完成。

    在第一次运行时,我会手动将任务标记为成功,但之后 A 将始终等待上一批任务 E 的成功执行。

    我可以通过E中的触发条件,进一步控制B、C、D的状态对任务E的依赖。

    【讨论】:

    • ExternalTask​​Sensor 正在回填期间创建竞争条件。由于一次运行多个实例,并且depends_on_past 为True。一旦第 1 次运行的 ExternalTask​​Sensor 完成并转移到实际任务,它就会在第 2 次运行中触发 ExternalTask​​Sensor 并消耗所有插槽。
    • 在 ExternalTask​​Sensor 中使用 reschedule 模式,我能够解决竞争条件。
    猜你喜欢
    • 2019-08-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-07
    • 2018-07-01
    相关资源
    最近更新 更多