【问题标题】:Dynamic dags not getting added by scheduler调度程序未添加动态 dag
【发布时间】:2019-08-12 13:58:52
【问题描述】:

我正在尝试创建动态 DAG,然后将它们发送到调度程序。我尝试了来自https://www.astronomer.io/guides/dynamically-generating-dags/ 的参考,效果很好。我在下面的代码中对其进行了一些更改。在调试问题时需要帮助。

我试过了 1. 试运行文件。 Dag 被执行并且 globals() 正在打印所有 DAG 对象。但不知何故没有在 list_dags 或 UI 中列出

from datetime import datetime, timedelta
import requests
import json
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator

def create_dag(dag_id,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval="@hourly",
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)

    return dag


def fetch_new_dags(**kwargs):

    for n in range(1, 10):
        print("=====================START=========\n")
        dag_id = "abcd_" + str(n) 
        print (dag_id)
        print("\n")
        globals()[dag_id] = create_dag(dag_id, n, default_args)
        print(globals())

default_args = {
    'owner': 'diablo_admin',
    'depends_on_past': False,
    'start_date': datetime(2019, 8, 8),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'trigger_rule': 'none_skipped'
    #'schedule_interval': '0 * * * *'
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('testDynDags', default_args=default_args, schedule_interval='*/1 * * * *')
#schedule_interval='*/1 * * * *'

check_for_dags = PythonOperator(dag=dag,
                   task_id='tst_dyn_dag',
                   provide_context=True,
                   python_callable=fetch_new_dags
                   )




check_for_dags

预计动态创建 10 个 DAG 并添加到调度器中。

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:

    我想这样做可以解决它

    • 完全移除全局testDynDags dag 和tst_dyn_dags 任务(实例化和调用)
    • 使用全局范围内的必要参数调用您的 fetch_new_dags(..) 方法

    解释

    • 动态 dag/任务仅意味着您在编写 dag 定义文件时具有明确定义的逻辑,它可以帮助以预定义的方式创建具有已知结构的任务/dag .
    • 您无法在运行时确定 DAG 的结构(任务执行)。因此,例如,如果上游任务返回整数值 n,则不能将 n 个相同的任务添加到 DAG。但是您可以遍历包含 n 个段的 YAML 文件并生成 n 个任务/dag。

    很明显,将 dag 生成代码包装在 Airflow 任务本身是没有意义的。


    UPDATE-1

    根据 cmets 中的指示,我推断该要求要求您修改将输入(要创建多少 dag 或任务)提供给 DAG/任务生成脚本的外部源。虽然这确实是一个复杂的用例,但实现此目的的一种简单方法是创建 2 个单独的 DAG。

    • 一个 dag 偶尔运行一次,并生成存储在外部资源(如 Airflow 变量)(或任何其他外部存储,如文件/S3/数据库等)中的输入
    • 第二个 DAG 是通过读取第一个 DAG 写入的同一数据源以编程方式构建的

    您可以从Adding DAGs based on Variable value section 获得灵感

    【讨论】:

    • 谢谢舒巴姆。我的目的是创建具有相同结构的新 dag,每个 DAG 中任务的输入会有所不同,包括时间表。因此,逻辑和我需要 testDynDags 以常规频率运行。如果我删除 testDynDags 和 tst_dyn_dags 任务。如何继续从外部源调用 fetch_new_dags 方法(因为我找不到要执行的 API)?目前,在 tst_dyn_dags 任务中,我实际上正在调用一个 rest api 并检查要创建的新 dag_ids 以及所需的参数。
    • 再次感谢舒巴姆。我使用一个 DAG(它调用 python 程序)和一个只创建 DAG 的 python 程序来实现这一点。
    • @Saideep 你能指导我如何实现这一目标吗?
    猜你喜欢
    • 2021-10-22
    • 2019-05-04
    • 2021-04-09
    • 1970-01-01
    • 2018-12-09
    • 2016-12-12
    • 1970-01-01
    • 2020-03-07
    • 1970-01-01
    相关资源
    最近更新 更多