【问题标题】:How to create dags inside another dag apache airflow如何在另一个 dag apache 气流中创建 dag
【发布时间】:2020-07-04 18:15:14
【问题描述】:

我正在尝试拥有一个 master dag,它将根据我的需要创建更多的 dag。 我在 airflow.cfgdags_folder 中有以下 python 文件。 此代码在数据库中创建主 dag。这个主 dag 应该读取一个文本文件,并且应该为文本文件中的每一行创建 dag。但是在主 dag 中创建的 dag 不会添加到数据库中。创建它的正确方法是什么?

版本详情:

Python 版本:3.7

Apache-airflow 版本:1.10.8

import datetime as dt

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

root_dir = "/home/user/TestSpace/airflow_check/res"

print("\n\n ===> \n Dag generator")

default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2020, 3, 22, 00, 00, 00),
    'concurrency': 1,
    'retries': 0
}


def greet(_name):
    message = "Greetings {} at UTC: {} Local: {}\n".format(_name, dt.datetime.utcnow(), dt.datetime.now())
    f = open("{}/greetings.txt".format(root_dir), "a+")
    print("\n\n =====> {}\n\n".format(message))
    f.write(message)
    f.close()


def create_dag(dag_name):
    with DAG(dag_name, default_args=default_args,
             schedule_interval='*/2 * * * *',
             catchup=False
             ) as i_dag:
        i_opr_greet = PythonOperator(task_id='greet', python_callable=greet,
                                     op_args=["{}_{}".format("greet", dag_name)])
        i_echo_op = BashOperator(task_id='echo', bash_command='echo `date`')

        i_opr_greet >> i_echo_op
    return i_dag


def create_all_dags():
    all_lines = []
    f = open("{}/../dag_names.txt".format(root_dir), "r")
    for x in f:
        all_lines.append(str(x))
    f.close()

    for line in all_lines:
        print("Dag creation for {}".format(line))
        globals()[line] = create_dag(line)


with DAG('master_dag', default_args=default_args,
         schedule_interval='*/1 * * * *',
         catchup=False
         ) as dag:
    echo_op = BashOperator(task_id='echo', bash_command='echo `date`')
    create_op = PythonOperator(task_id='create_dag', python_callable=create_all_dags)
    echo_op >> create_op

【问题讨论】:

    标签: python python-3.x airflow


    【解决方案1】:

    你有两个选择:

    1. 使用 SubDagOperatorExample DAG。如果您的计划间隔可以相同,请使用它。
    2. 编写 Python DAG 文件:从您掌握的 DAG 中,在包含 DAG 的 AIRFLOW_HOME 中创建 Python 文件。您可以为此使用 Jinja2 模板引擎。

    【讨论】:

      【解决方案2】:
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-10-15
      • 2018-08-05
      • 1970-01-01
      • 2019-09-16
      • 2017-03-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多