【问题标题】:Is it possible (and a good idea) to create Airflow dags dynamically based on the results from a database?是否可以(也是一个好主意)根据数据库的结果动态创建 Airflow dag?
【发布时间】:2021-06-04 07:40:47
【问题描述】:

我知道以前有人问过这个问题,但我想简单地说一下。我正在使用 Astronomer 运行 Airflow v1.10.10。每个 dag 运行(每天)可能有不同数量的任务。此任务数将由给定 dag 运行(即那一天)的数据库表中的行数定义。

我将在 dag 文件中运行一个查询并创建一个这样的列表:

list = client.query(SELECT * FROM TABLE WHERE DATE = {{ds}})

然后我会根据列表的大小来创建任务:

for i in range(1, len(list)):
    sample_airflow_operator(name=i + '_operator') >> sample_airflow_operator(name=(i+1) + '_operator') 

等等。我意识到这是一个粗略的例子,没有涵盖 i = 0 或 i = len(list) 时的情况,但我并不担心。

如果我们将这个 dag 定义为每天,是否会每天成功创建 dag,每天执行不同数量的任务(取决于我上面提到的查询结果)? Airflow 可以在每次 dag 运行之前执行list 查询吗?在保持 dag 开启和免维护的同时,这是否可行?

【问题讨论】:

    标签: airflow


    【解决方案1】:

    由于您不是在创建 DAG,而是在 dag 内部创建任务,简短的回答是这将起作用

    据我所知,这不是一种常见的做法,但有一些已知的缺点,我认为对于非常特殊的用例这样做是有意义的。

    但是要记住的事情很少,

    • 对于任何动态生成的任务,请勿将depend_on_pastwait_for_downstream 用作True。具有这些设置的任务会检查之前的执行情况,如果之前没有执行,它们将不会运行。

    • DAG UI(树模式)将仅显示基于最新执行的任务。

    • DAG UI(图形模式)将根据执行 dag 时 dag 中的内容显示任务。但是,我不确定您是否能够重新运行当前 DAG 中不存在的旧任务。所以这是一个妥协。

    【讨论】:

    • 这很有意义,我会将depend_on_past 应用于以前的任务,每次都是静态的。 UI cmets 也很有意义。非常感谢。
    • 很高兴听到。您能否将此答案标记为“已接受”
    • 我只是在等看看是否还有其他输入。再次感谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-05-29
    • 1970-01-01
    • 2018-04-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多