【发布时间】:2020-12-12 08:23:21
【问题描述】:
请不要对我的回答投反对票。如果需要,我会更新并更正我的话。我已经完成了我的家庭作业研究。我有点新,所以试图理解这一点。
我想了解 Google 云平台上的气流如何获取从 dags 主文件夹到 UI 的更改。另外请帮助我使用我的 dags 设置脚本。我已经阅读了很多答案以及书籍。书的链接是here
我试着从第 69 页找出我的答案
3.11 调度和触发器 Airflow 调度器监控所有任务和所有 DAG,并触发其依赖项具有 被满足。在幕后,它监控并与 它可能包含的所有 DAG 对象的文件夹,并且定期(每个 分钟左右)检查活动任务,看看它们是否可以 触发。
我对这本书的理解是调度程序会定期从 dags 主文件夹中获取更改。 (对吗?)
我还阅读了有关堆栈溢出的多个答案,我发现这个很有用 Link
但答案仍然不包含从 dag 主文件夹中的 script.py 创建/更新 dagbag 的过程。如何感知变化。
请帮助我完成我的 dags 设置脚本。 我们创建了一个通用的 Python 脚本,它通过读取/迭代配置文件来动态创建 dag。
下面是目录结构
/dags/workflow/
/dags/workflow/config/dag_a.json
/dags/workflow/config/dag_b.json
/dags/workflow/task_a_with_single_operator.py
/dags/workflow/task_b_with_single_operator.py
/dags/dag_creater.py
dag_creater.py的执行流程如下 :-
1. Iterate in dags/workflow/config folder get the Config JSON file and
read variable dag_id.
2. create Parent_dag = DAG(dag_id=dag_id,
start_date=start_date, schedule_interval=schedule_interval,
default_args=default_args, catchup=False)
3. Read tasks with dependencies of that dag_id from config json file
(example :- [[a,[]],[b,[a]],[c,[b]]]) and code it as task_a >>
task_b >> task_c
这样就创建了 dag。一切正常。 Dags 在 UI 上也可见并且运行良好。
但问题是,我的 dag 创建脚本每次都在运行。即使在每个任务日志中,我也会看到所有 dag 的日志。我希望这个脚本运行一次。只是为了填写元数据条目。我无法理解为什么它每次都在运行。 请让我理解这个过程。
我知道一旦我们第一次设置元数据,airflow initdb 就会运行。所以这并不是一直在做这个更新。
- 调度器心跳是否全部更新?
- 我的设置是否正确?
请注意:我不能输入真正的代码,因为这是我的限制 组织。但是,如果被问到,我会提供更多信息。
【问题讨论】:
-
您是否尝试将 DAG 创建脚本定位在
dagsfolder 之外的某个位置,以消除 Composer 调度此脚本的风险? -
不,我们没有尝试过。但是说所有的python文件都是定期编译的对吗?
标签: google-cloud-platform airflow airflow-scheduler