【问题标题】:Simple way to execute tasks defined as a DAG, in python?在python中执行定义为DAG的任务的简单方法?
【发布时间】:2021-12-04 08:17:19
【问题描述】:

我正在以一种复杂的方式运行一系列相互依赖的任务。我想将这些依赖关系描述为 DAG(有向无环图)并在需要时执行该图。

我一直在看气流,并写了一个虚拟脚本:

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator


def cloud_runner():
    # my typical usage here would be a http call to a service (e.g. gcp cloudrun)
    pass


with DAG(dag_id="my_id", schedule_interval=None, start_date=datetime.max) as dag:
    first_task = PythonOperator(task_id="1", python_callable=cloud_runner)
    second_task = PythonOperator(task_id="2", python_callable=cloud_runner)
    second_task_bis = PythonOperator(task_id="2bis", python_callable=cloud_runner)
    third_task = PythonOperator(task_id="3", python_callable=cloud_runner)

    first_task >> [second_task, second_task_bis] >> third_task

运行以下命令完成这项工作:

airflow dags backfill my_id --start-date 2020-01-02

问题:

我的使用绝不会涉及任何类型的日程安排/开始日期/结束日期。此外,我的 DAG 将从 python Flask 服务器执行。

问题:

有没有办法在没有气流的情况下达到同样的效果?或者在独立的 Python 脚本中以仅触发模式(没有所有调度部分、airflow.db 等)使用气流?

谢谢

【问题讨论】:

    标签: python flask airflow google-cloud-run directed-acyclic-graphs


    【解决方案1】:

    Airflow 既是库又是应用程序。 DAG 不必以计划的方式运行。您可以使用 API/CLI 按需触发它们。 如果 Airflow 应用程序未运行,则无法运行 DAG(计划或手动触发)。 Airflow 需要调度程序和元数据库才能运行。

    要回答您的问题 - 不可以。您必须设置并运行 Airflow 才能使 DAG 运行。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-04-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-02-22
      • 1970-01-01
      相关资源
      最近更新 更多