【发布时间】:2021-06-30 20:09:53
【问题描述】:
我从一年前开始使用 Airflow(在 Cloud Composer 上),我很难弄清楚 (Celery) 工作人员在收到要执行的任务时如何知道要执行哪些操作。
据我了解:
- 我们将一些 DAG 放在 /dags 文件夹中。
- 调度程序通过循环过程解析 DAG 并将结果保存在元数据数据库中,它还会根据其依赖关系确定 DAG 中的任务是否必须运行。
- 如果某些任务必须运行,Executor 会将任务发送到 Celery 工作人员监听的队列。
- 其中一名 Celery 工人得到了执行和完成工作的任务。
但是 Celery 工人如何知道要执行什么? 我可以看到有一个日志说:
[2021-06-30 12:58:59,814] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'dag_to_exec', 'task_to_exec', '2021-06-30T12:57:09+00:00', '--job_id', '2822201', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/dag_to_exec.py', '--cfg_path', '/tmp/tmpank91zop']
如果我错了,请纠正我,但 '-sd', 'DAGS_FOLDER/dag_to_exec.py' 的部分是否在这里对这个 Airflow 工作人员说“从保存在那里的这个 dag 执行这个任务”?所以 Airflow 工作人员也需要解析 DAG 才能理解它,对吗?我说“也”是因为调度程序确实解析得太早了。
如果您有共享链接或部分源代码可以查看以了解这一点,提前致谢!
【问题讨论】:
-
您可以查看airflow's github 获取他们的源代码。如果您对气流流道感兴趣,可以从 airflow/airflow/task/task_runner 开始。这个article 也可以帮助理解气流如何分配芹菜工人的工作。
标签: airflow google-cloud-composer