【问题标题】:How often is dag definition file read during a single dag run (is dag reevaluated / recalculated every time a task runs / fires)?在单个 dag 运行期间多久读取一次 dag 定义文件(每次任务运行/触发时都会重新评估/重新计算 dag)?
【发布时间】:2020-02-03 18:07:23
【问题描述】:

在单个 dag 运行期间多久读取一次 dag 定义文件?

有一个大的 dag,需要很长时间才能构建(约 1-3 分钟)。在 dag 运行时查看每个任务的日志,似乎每个任务在运行之前都在执行 dag 定义文件......

*** Reading local file: /home/airflow/airflow/logs/mydag/mytask/2020-01-30T04:51:34.621883+00:00/1.log
[2020-01-29 19:02:10,844] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: mydag.mytask2020-01-30T04:51:34.621883+00:00 [queued]>
[2020-01-29 19:02:10,866] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: mydag.mytask2020-01-30T04:51:34.621883+00:00 [queued]>
[2020-01-29 19:02:10,866] {taskinstance.py:866} INFO - 
--------------------------------------------------------------------------------
[2020-01-29 19:02:10,866] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-01-29 19:02:10,866] {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
[2020-01-29 19:02:10,883] {taskinstance.py:887} INFO - Executing <Task(BashOperator): precheck_db_perms> on 2020-01-30T04:51:34.621883+00:00
[2020-01-29 19:02:10,887] {standard_task_runner.py:52} INFO - Started process 140570 to run task
[2020-01-29 19:02:11,048] {logging_mixin.py:112} INFO - [2020-01-29 19:02:11,047] {dagbag.py:403} INFO - Filling up the DagBag from /home/airflow/airflow/dags/mydag.py
[2020-01-29 19:02:11,052] {logging_mixin.py:112} INFO - <output from my dag definition file>
[2020-01-29 19:02:11,101] {logging_mixin.py:112} INFO - <more output from my dag definition file>
....
....
....
[2020-01-29 19:02:58,651] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: mydag.mytask 2020-01-30T04:51:34.621883+00:00 [running]> airflowetl.co.local
[2020-01-29 19:02:58,674] {bash_operator.py:81} INFO - Tmp dir root location: 
 /tmp
[2020-01-29 19:02:58,674] {bash_operator.py:91} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=me@co.org
AIRFLOW_CTX_DAG_OWNER=me
AIRFLOW_CTX_DAG_ID=mydag
AIRFLOW_CTX_TASK_ID=mytask
AIRFLOW_CTX_EXECUTION_DATE=2020-01-30T04:51:34.621883+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2020-01-30T04:51:34.621883+00:00
[2020-01-29 19:02:58,675] {bash_operator.py:105} INFO - Temporary script location: /tmp/airflowtmphwu1ckty/mytaskbmnsizw5
<only now does the actual task logic output seem to start>

日志的第一部分似乎暗示每次运行新任务时都在运行 dag 文件(我在每个任务中都看到了这一点)。

这真的是这里发生的事情吗?这是正常/预期的行为吗?请注意,由于我的 dag 需要一些时间来构建,这意味着时间会在 dag 中的每个任务(在这种情况下有很多)成倍增加,这让我认为这要么不正常,要么有一些我没有在这里使用的最佳实践。有更多气流经验的人可以帮助解释我在这里看到的情况吗?

【问题讨论】:

    标签: airflow


    【解决方案1】:

    在对airflow email list 进行了一些讨论之后,事实证明 airflow 在运行时为每个任务构建 dag(因此每个任务都包括再次构建 dag 的开销(在我的情况下这是非常重要的) ))。来自对话

    是的,每个任务都在进程隔离中运行(并且可以在不同的机器上运行),因此每个任务都从头开始构建 DAG。

    基本上任务是由代码本身定义的,因此工作进程只能通过解析定义它的python代码来确定运行时要执行的代码。也许在某些情况下,可以在包含它的 dag 的完整上下文之外很好地定义任务,但通常并不适用。

    不是任务关心 DAB 结构,而是任务只作为 DAG 的一部分存在,在 Airflow 中获取任务的唯一方法是先获取 DAG。至少据我所知。

    最终腌制我用来构建图表的配置,并让 dag-config 构建一个计划的 dag,然后 triggers 我的实际 dag(该计划设置为无)命令就是我所做的。我的 dag 是在一个循环中创建的,该循环执行 DB 查询以为一组表创建 DAG 分支,因此涉及查询并导致开销 b/c 查询正在为每个任务完成,而实际上只需要执行一次为 DAG 制作配置字典)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-04
      • 2021-12-27
      • 1970-01-01
      • 1970-01-01
      • 2019-11-18
      • 1970-01-01
      相关资源
      最近更新 更多