如果您使用 Docker 运行 Airflow,您可以使用 DAG 中的 BashOperator 删除另一个 DAG:
t1 = BashOperator(task_id='delete_dag_task', bash_command=f'airflow dags delete -y {dag_id}')
dag_id 是 dag 的名称。这使用标准 CLI 命令,而不是自己从元数据库中删除记录。您还需要使用 PythonOperator 从 dags 目录中删除 DAG 文件。
我有这样一个 DAG:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash import BashOperator
import os
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'start_date': days_ago(1),
'owner': 'airflow',
'retries': 1
}
def delete_dag(**context):
conf = context["dag_run"].conf
dag_id = conf["dag_name"]
t1 = BashOperator(task_id='delete_dag_task', bash_command=f'airflow dags delete -y {dag_id}')
t1.execute(context=context)
def delete_dag_file(**context):
conf = context["dag_run"].conf
dag_id = conf["dag_name"]
script_dir = os.path.dirname(__file__)
dag_file_path = os.path.join(script_dir, '{}.py'.format(dag_id))
try:
os.remove(dag_file_path)
except OSError:
pass
with DAG('dag-deleter',
schedule_interval=None,
default_args=default_args,
is_paused_upon_creation=False,
catchup=False) as dag:
delete_dag = PythonOperator(
task_id="delete_dag",
python_callable=delete_dag,
provide_context=True)
delete_dag_file = PythonOperator(
task_id="delete_dag_file",
python_callable=delete_dag_file,
provide_context=True
)
delete_dag >> delete_dag_file
我使用 REST API 触发 DAG,在 http 请求中传递以下负载:
{"conf": {"dag_name": "my_dag_name"} }