【发布时间】:2021-10-07 15:19:53
【问题描述】:
我正在尝试配置一系列 Airflow 任务来回填一些数据 (catchup=True)。部署 DAG 并取消暂停后,第一个作业会成功运行,但所有后续运行的任务都设置为 no_status,并且它们永远不会启动。
我尝试了重命名 DAG、重新启动 Airflow 服务器和调度程序、清除旧日志的变体,但这里没有任何进展。
想法?
DAG 代码:
default_args = {
"owner": "me",
"retries": 2,
"retry_delay": timedelta(minutes=2),
"sla": timedelta(hours=1),
"start_date": "2021-01-01T00:00",
}
dag = DAG(
catchup=True,
dag_id="ingest_dag_testing_6",
dagrun_timeout=timedelta(hours=1),
default_args=default_args,
max_active_runs=1,
schedule_interval="30 * * * *",
)
DATA_SOURCE_TYPES = [
{
"target_name": "task_a",
"children": [
{
"target_name": "subtask_a1",
},
{
"target_name": "subtask_a2",
},
],
}
]
with dag:
for dst in DATA_SOURCE_TYPES:
sub_ingest_tasks = []
ingest_task = PythonOperator(
task_id=f"ingest_{dst.get('target_name')}",
python_callable=run_ingestion,
op_args=[logger, exe_date, dst],
)
if dst.get("children"):
for sdst in dst.get("children"):
sub_ingest_tasks.append(
PythonOperator(
task_id=f"ingest_{sdst.get('target_name')}",
python_callable=run_ingestion,
op_args=[logger, exe_date, sdst],
)
)
ingest_task >> sub_ingest_tasks
【问题讨论】:
-
测试了你的示例代码,它对我来说很好。