【发布时间】:2021-06-04 07:40:47
【问题描述】:
我知道以前有人问过这个问题,但我想简单地说一下。我正在使用 Astronomer 运行 Airflow v1.10.10。每个 dag 运行(每天)可能有不同数量的任务。此任务数将由给定 dag 运行(即那一天)的数据库表中的行数定义。
我将在 dag 文件中运行一个查询并创建一个这样的列表:
list = client.query(SELECT * FROM TABLE WHERE DATE = {{ds}})
然后我会根据列表的大小来创建任务:
for i in range(1, len(list)):
sample_airflow_operator(name=i + '_operator') >> sample_airflow_operator(name=(i+1) + '_operator')
等等。我意识到这是一个粗略的例子,没有涵盖 i = 0 或 i = len(list) 时的情况,但我并不担心。
如果我们将这个 dag 定义为每天,是否会每天成功创建 dag,每天执行不同数量的任务(取决于我上面提到的查询结果)? Airflow 可以在每次 dag 运行之前执行list 查询吗?在保持 dag 开启和免维护的同时,这是否可行?
【问题讨论】:
标签: airflow