【发布时间】:2020-04-07 16:45:10
【问题描述】:
我目前正在开发一个 DAG,该 DAG 将通过电子邮件发送用户列表,无论 DAG 是成功完成还是失败。我试图让 DAG 的流程看起来像这里的示例:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
def print_hello():
return 'Hello world!'
default_args = {
'owner': 'peter',
'start_date':datetime(2018,8,11),
}
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='* * * * *',
default_args = default_args, catchup=False)
hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)
email_success = EmailOperator(
task_id='send_email',
to='to@gmail.com',
subject='Airflow Alert Success',
html_content=""" <h3>Email Test Success</h3> """,
dag=dag
)
email_failure = EmailOperator(
task_id='send_email',
to='to@gmail.com',
subject='Airflow Alert Failure',
html_content=""" <h3>Email Test Failed</h3> """,
dag=dag
)
hello_operator.set_downstream(email_success,email_failure)
是否有一个内置运算符,我可以使用气流来决定是否在 DAG 完成时发送 email_success 运算符,或者在 DAG 因任何原因失败时是否执行 email_failure 运算符?
谢谢
【问题讨论】: