【问题标题】:Airflow tasks set to `no_status` when catchup is True当追赶为真时,气流任务设置为“no_status”
【发布时间】:2021-10-07 15:19:53
【问题描述】:

我正在尝试配置一系列 Airflow 任务来回填一些数据 (catchup=True)。部署 DAG 并取消暂停后,第一个作业会成功运行,但所有后续运行的任务都设置为 no_status,并且它们永远不会启动。

我尝试了重命名 DAG、重新启动 Airflow 服务器和调度程序、清除旧日志的变体,但这里没有任何进展。

想法?

DAG 代码:

default_args = {
    "owner": "me",
    "retries": 2,
    "retry_delay": timedelta(minutes=2),
    "sla": timedelta(hours=1),
    "start_date": "2021-01-01T00:00",
}

dag = DAG(
    catchup=True,
    dag_id="ingest_dag_testing_6",
    dagrun_timeout=timedelta(hours=1),
    default_args=default_args,
    max_active_runs=1,
    schedule_interval="30 * * * *",
)

DATA_SOURCE_TYPES = [
    {
        "target_name": "task_a",
        "children": [
            {
                "target_name": "subtask_a1",
            },
            {
                "target_name": "subtask_a2",
            },
        ],
    }
]

with dag:
    for dst in DATA_SOURCE_TYPES:
        sub_ingest_tasks = []

        ingest_task = PythonOperator(
            task_id=f"ingest_{dst.get('target_name')}",
            python_callable=run_ingestion,
            op_args=[logger, exe_date, dst],
        )
        if dst.get("children"):
            for sdst in dst.get("children"):
                sub_ingest_tasks.append(
                    PythonOperator(
                        task_id=f"ingest_{sdst.get('target_name')}",
                        python_callable=run_ingestion,
                        op_args=[logger, exe_date, sdst],
                    )
                )

        ingest_task >> sub_ingest_tasks

【问题讨论】:

  • 测试了你的示例代码,它对我来说很好。

标签: airflow airflow-scheduler


【解决方案1】:

您的代码执行得很好。

我从您的代码中创建了一个可运行的示例(因为它缺少导入/可调用):

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta


def run_ingestion(**context):
    print("Hello World")

default_args = {
    "owner": "me",
    "retries": 2,
    "retry_delay": timedelta(minutes=2),
    "sla": timedelta(hours=1),
    "start_date": "2021-01-01T00:00",
}

dag = DAG(
    catchup=True,
    dag_id="ingest_dag_testing_6",
    dagrun_timeout=timedelta(hours=1),
    default_args=default_args,
    max_active_runs=1,
    schedule_interval="30 * * * *",
)

DATA_SOURCE_TYPES = [
    {
        "target_name": "task_a",
        "children": [
            {
                "target_name": "subtask_a1",
            },
            {
                "target_name": "subtask_a2",
            },
        ],
    }
]

with dag:
    for dst in DATA_SOURCE_TYPES:
        sub_ingest_tasks = []

        ingest_task = PythonOperator(
            task_id=f"ingest_{dst.get('target_name')}",
            python_callable=run_ingestion,
            #op_args=[logger, exe_date, dst],
        )
        if dst.get("children"):
            for sdst in dst.get("children"):
                sub_ingest_tasks.append(
                    PythonOperator(
                        task_id=f"ingest_{sdst.get('target_name')}",
                        python_callable=run_ingestion,
                        #op_args=[logger, exe_date, sdst],
                    )
                )

        ingest_task >> sub_ingest_tasks

你可以看到它工作正常:

如果您运行的是旧版 Airflow,则更改 dag_id 可能会解决问题。可能是与此 dag_id 相关的一些旧的 db 记录痕迹未正确清理。调度程序在以后的版本中进行了重大重构。

如果上述方法没有帮助,唯一的解决方案可能是升级到最新的 Airflow 版本,因为它可能是旧版本中的一个错误,在此过程中已修复(因为您共享的代码不会重现您遇到的问题在最新的 Airflow 版本中描述)。

【讨论】:

  • 谢谢,@Elad。我做了一些额外的故障排除,发现 SchedulerJobs 在第一个作业完成后计时。我得再修改一下这些设置。
猜你喜欢
  • 2021-04-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-09-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多