【问题标题】:Airflow workers : how do they know what to do? + problem with气流工作者:他们怎么知道该怎么做? + 问题
【发布时间】:2021-06-30 20:09:53
【问题描述】:

我从一年前开始使用 Airflow(在 Cloud Composer 上),我很难弄清楚 (Celery) 工作人员在收到要执行的任务时如何知道要执行哪些操作。

据我了解:

  • 我们将一些 DAG 放在 /dags 文件夹中。
  • 调度程序通过循环过程解析 DAG 并将结果保存在元数据数据库中,它还会根据其依赖关系确定 DAG 中的任务是否必须运行。
  • 如果某些任务必须运行,Executor 会将任务发送到 Celery 工作人员监听的队列。
  • 其中一名 Celery 工人得到了执行和完成工作的任务。

但是 Celery 工人如何知道要执行什么? 我可以看到有一个日志说:

[2021-06-30 12:58:59,814] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'dag_to_exec', 'task_to_exec', '2021-06-30T12:57:09+00:00', '--job_id', '2822201', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/dag_to_exec.py', '--cfg_path', '/tmp/tmpank91zop']

如果我错了,请纠正我,但 '-sd', 'DAGS_FOLDER/dag_to_exec.py' 的部分是否在这里对这个 Airflow 工作人员说“从保存在那里的这个 dag 执行这个任务”?所以 Airflow 工作人员也需要解析 DAG 才能理解它,对吗?我说“也”是因为调度程序确实解析得太早了。

如果您有共享链接或部分源代码可以查看以了解这一点,提前致谢!

【问题讨论】:

标签: airflow google-cloud-composer


【解决方案1】:

是的。你有正确的理解。

DAG 由 Workers 和 Scheduler 进行解析。调度程序永远不会执行BaseOperator 定义对象的execute() 方法。它将解析 DAG 文件,将 DAG 和 Operators 构建为 Python 对象,并在它们之间建立关系以便能够知道应该安排什么。

每个工作人员在执行任务之前重新执行此解析/创建步骤,以便能够构建“BaseOperator”派生对象(包括依赖项,但这些对工作人员并不重要),选择正确的“任务” (即由 task_id 标识的 BaseOperator 派生对象并运行它的 execute() 方法(还有一些细微差别,如 pre-executepost-execute 方法也在执行中)。

这在https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html#workloads最容易到达的几个地方进行了高级描述

如果你觉得这样一个“类/解析”关系的描述不容易找到/理解,我衷心邀请你帮助社区,从新人来的角度添加这样的描述。如果第一次尝试理解它的人为后面的其他人提供更多背景信息,这总是最好的(作为长期的 Airflow 提交者,我们有很多假设)。

这实际上超级简单https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html#workloads - 在底部包含“建议更改”链接,您可以使用它对文档进行 PR(确保您先 fork 气流回购)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-07-15
    • 1970-01-01
    • 2017-05-05
    • 1970-01-01
    • 1970-01-01
    • 2010-12-07
    • 2014-02-28
    • 1970-01-01
    相关资源
    最近更新 更多