【问题标题】:Airflow can't finish dag runs when the start date of a task is greater than the execution date当任务的开始日期大于执行日期时,气流无法完成 dag 运行
【发布时间】:2020-04-22 11:58:49
【问题描述】:

我正在尝试实现具有不同开始日期的多个任务的 DAG。 根据 Airflow 文档,绝对是允许的,但不确定它是否是正确的模式。

这里是一个 Dag 示例:

import datetime as dt
import os

from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG, Variable


with DAG(DAG_ID, default_args={}, schedule_interval="0 4 * * *",) as dag:
    op1 = DummyOperator(start_date=dt.datetime(2019, 7, 1), task_id="op1", dag=dag,)

    op2 = DummyOperator(start_date=dt.datetime(2019, 7, 5), task_id="op2", dag=dag,)

第一次dag运行时间安排在2019-07-01,执行op1没有问题,果然没有执行op2。这是我所期待的行为。 但是,调度程序无法移动到 2019 年 7 月 2 日的下一个 dag 运行,因为上一个 dag 运行(2019 年 7 月 1 日)仍处于运行状态。

有什么办法可以避免这种行为,并在所有带有start_date < execution_date 的任务都完成后自动将 dag 运行设置为完成?

我正在运行气流 1.10.2(但从一些简单的测试来看,移动到最新版本没有任何变化)。

【问题讨论】:

    标签: python python-3.x airflow airflow-scheduler


    【解决方案1】:

    上面的 dag 包含 op1 和 op2,它们的区别在于start_date

    看起来你想做的是:

    • op1 应从 2019 年 7 月 1 日起每天 04:00 运行。
    • op2 应从 2019 年 7 月 5 日起每天 04:00 运行。

    我建议

    • 要么有两个单独的 dags
    • 或使他们的start_date 相同。

    如果您不想使用这两种方法,那么有一个名为max_active_runs 的 dag 参数可以根据您的目的增加。阅读更多here

    【讨论】:

    • 示例中我没有包含,但是这两个运算符应该有一些依赖关系。 op1 >> op2。使用相同的 start_date 是一种选择,但是当没有数据时,您需要一些逻辑来管理 op2。 max_active_runs 并没有解决问题,它只是开始了更多的 dag 运行,但没有一个可以完成。
    • ExternalTaskSensor,希望对您有所帮助。
    • 嘿 Shiv,这是我想要避免的。我有太多依赖项,使用 ExternalTask​​Sensor 跟踪它们是一场噩梦。实际上,我正在从每个 dag 拥有 1 个任务和许多用于管理依赖关系的 ExternalTask​​Sensor 转变为包含所有连接任务的整体 dag。
    猜你喜欢
    • 2018-02-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多