【问题标题】:How can I run all all neccessary DAGs / Do I need ExternalTaskSensor?如何运行所有必要的 DAG / 我需要 ExternalTask​​Sensor 吗?
【发布时间】:2020-05-19 08:27:17
【问题描述】:

是否可以使用 externalTask​​Sensor 在另一个时间运行两个 dag?

我有两个 DAG。 DAG A 每两小时运行一次

  • 上午 10 点(成功)
  • 上午 12 点(失败)
  • 下午 2 点(成功)

Dag B 依赖于 DAG A。DAG B 在凌晨 12 点等待 DAG A 并失败,因为 DAG A 失败了。但是由于 DAG A 在下午 2 点成功,Dag B 应该会运行。

您如何实现这一点?使用 ExternalTask​​Sensor?

我只是有一个小假人,试图理解它。

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.timezone import datetime
from datetime import datetime, timedelta
import airflow 

source_dag = DAG(
    dag_id='sensor_dag_source',
    start_date = datetime(2020, 1, 20),
    schedule_interval='* * * * *'
)

first_task = DummyOperator(task_id='first_task', dag=source_dag)

target_dag = DAG(
    dag_id='sensor_dag_target',
    start_date = datetime(2020, 1, 20),
    schedule_interval='* * * * *'   
)

task_sensor = ExternalTaskSensor(
    dag=target_dag,
    task_id='dag_sensor_source_sensor',
    retries=100,
    retry_delay=timedelta(seconds=30),
    mode='reschedule',
    external_dag_id='sensor_dag_source',
    external_task_id='first_task'
)

first_task = DummyOperator(task_id='first_task', dag=target_dag)

task_sensor >> first_task

【问题讨论】:

    标签: airflow


    【解决方案1】:

    您可以尝试使用 TriggerDagRunOperator 并从 DAG A 触发 DAG B

    这是一个完整的答案- In airflow, is there a good way to call another dag's task?

    还有一篇关于它的好帖子-

    Wiring top-level DAGs together

    【讨论】:

      猜你喜欢
      • 2020-11-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-02-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多