【发布时间】:2020-08-14 07:32:12
【问题描述】:
我有一个 python DAG Parent Job 和 DAG Child Job。 Child Job 中的任务应在成功完成每天运行的Parent Job 任务时触发。如何添加外部作业触发器?
我的代码
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')
execute_notebook = PostgresOperator(
task_id='data_sql',
postgres_conn_id='REDSHIFT_CONN',
sql="SELECT * FROM athena_rs.shipments limit 5",
dag=dag
)
【问题讨论】:
-
@LuckyGuess 该示例显示一个任务另一个 dag 触发另一个任务中的另一个任务。在这里我认为他在看什么,完成一个 DAG 完全触发下一个 DAG。如果你能举个例子就好了。
-
我强烈建议使用
TriggerDagRunOperator来执行响应式触发,而不是ExternalTaskSensor来执行基于轮询的触发 -
@y2k-shubham,如果你能像下面写的那样写一个例子,它也会为其他人学习。我也面临同样的问题。
-
@pankaj 我添加了一个描述
TriggerDagRunOperator使用的答案
标签: python python-3.x airflow directed-acyclic-graphs airflow-scheduler