【问题标题】:How airflow loads/updates DagBag from dags home folder on google cloud platform?气流如何从谷歌云平台上的 dags 主文件夹加载/更新 DagBag?
【发布时间】:2020-12-12 08:23:21
【问题描述】:

请不要对我的回答投反对票。如果需要,我会更新并更正我的话。我已经完成了我的家庭作业研究。我有点新,所以试图理解这一点。

我想了解 Google 云平台上的气流如何获取从 dags 主文件夹到 UI 的更改。另外请帮助我使用我的 dags 设置脚本。我已经阅读了很多答案以及书籍。书的链接是here

我试着从第 69 页找出我的答案

3.11 调度和触发器 Airflow 调度器监控所有任务和所有 DAG,并触发其依赖项具有 被满足。在幕后,它监控并与 它可能包含的所有 DAG 对象的文件夹,并且定期(每个 分钟左右)检查活动任务,看看它们是否可以 触发。

我对这本书的理解是调度程序会定期从 dags 主文件夹中获取更改。 (对吗?)

我还阅读了有关堆栈溢出的多个答案,我发现这个很有用 Link

但答案仍然不包含从 dag 主文件夹中的 script.py 创建/更新 dagbag 的过程。如何感知变化。

请帮助我完成我的 dags 设置脚本。 我们创建了一个通用的 Python 脚本,它通过读取/迭代配置文件来动态创建 dag。

下面是目录结构

/dags/workflow/
/dags/workflow/config/dag_a.json
/dags/workflow/config/dag_b.json
/dags/workflow/task_a_with_single_operator.py
/dags/workflow/task_b_with_single_operator.py
/dags/dag_creater.py

dag_creater.py的执行流程如下 :-

 1. Iterate in dags/workflow/config folder get the Config JSON file and
    read variable dag_id.
 2. create Parent_dag = DAG(dag_id=dag_id,
    start_date=start_date, schedule_interval=schedule_interval,
                             default_args=default_args, catchup=False) 
 3. Read tasks with dependencies of that dag_id from config json file
    (example :- [[a,[]],[b,[a]],[c,[b]]]) and code it as task_a >>
    task_b >> task_c

这样就创建了 dag。一切正常。 Dags 在 UI 上也可见并且运行良好。

但问题是,我的 dag 创建脚本每次都在运行。即使在每个任务日志中,我也会看到所有 dag 的日志。我希望这个脚本运行一次。只是为了填写元数据条目。我无法理解为什么它每次都在运行。 请让我理解这个过程。

我知道一旦我们第一次设置元数据,airflow initdb 就会运行。所以这并不是一直在做这个更新。

  • 调度器心跳是否全部更新?
  • 我的设置是否正确?

请注意:我不能输入真正的代码,因为这是我的限制 组织。但是,如果被问到,我会提供更多信息。

【问题讨论】:

  • 您是否尝试将 DAG 创建脚本定位在 dags folder 之外的某个位置,以消除 Composer 调度此脚本的风险?
  • 不,我们没有尝试过。但是说所有的python文件都是定期编译的对吗?

标签: google-cloud-platform airflow airflow-scheduler


【解决方案1】:

Airflow Scheduler 实际上是在 Airflow 运行时环境中持续运行,作为监视 DAG 文件夹中的更改并触发驻留在该文件夹中的相关 DAG 任务的主要贡献者。 Airflow Scheduler 服务的主要设置可以在airflow.cfg 文件中找到,本质上是心跳intervals,它有效地影响了一般 DAG 任务的维护。

但是,特定任务的执行方式是根据 Airflow 配置中的 Executor's 模型定义的。

为了存储可用于 Airflow 运行时环境的 DAG,GCP Composer 使用 Cloud Storage,实现特定文件夹 structure,同步到达 /dags 文件夹的任何对象,扩展名为 *.py 验证它是否包含 DAG @987654325 @。

如果您希望在 Airflow 运行时中运行 DAG 传播脚本,那么在这个特定用例中,我建议您查看 PythonOperator,在单独的 DAG 中使用它来调用和执行您的自定义通用 Python 代码并保证 @ 987654327@ 一次只有一次。你可以查看这个 Stack thread 的实现细节。

【讨论】:

  • 谢谢@Nick_kh !! .是的,这是有道理的。只是为可能阅读本文的人添加一些内容。我发现这个链接非常有用。 stackoverflow.com/questions/51558313/….
  • 实际上属性 dag_dir_list_interval 正在做所有事情。我也不确定,但我觉得属性 (store_dag_code) 对某些人也很有用,因为它从数据库而不是文件夹加载 dag。
猜你喜欢
  • 2020-12-11
  • 2018-08-03
  • 1970-01-01
  • 2018-04-06
  • 1970-01-01
  • 1970-01-01
  • 2021-12-11
  • 1970-01-01
  • 2016-12-26
相关资源
最近更新 更多