【问题标题】:Airflow Email Operator Success / Failure气流电子邮件操作员成功/失败
【发布时间】:2020-04-07 16:45:10
【问题描述】:

我目前正在开发一个 DAG,该 DAG 将通过电子邮件发送用户列表,无论 DAG 是成功完成还是失败。我试图让 DAG 的流程看起来像这里的示例:

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator

def print_hello():
    return 'Hello world!'

default_args = {
        'owner': 'peter',
        'start_date':datetime(2018,8,11),
}

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='* * * * *',
          default_args = default_args, catchup=False)

hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)

email_success = EmailOperator(
        task_id='send_email',
        to='to@gmail.com',
        subject='Airflow Alert Success',
        html_content=""" <h3>Email Test Success</h3> """,
        dag=dag
)

email_failure = EmailOperator(
        task_id='send_email',
        to='to@gmail.com',
        subject='Airflow Alert Failure',
        html_content=""" <h3>Email Test Failed</h3> """,
        dag=dag
)

hello_operator.set_downstream(email_success,email_failure)

是否有一个内置运算符,我可以使用气流来决定是否在 DAG 完成时发送 email_success 运算符,或者在 DAG 因任何原因失败时是否执行 email_failure 运算符?

谢谢

【问题讨论】:

    标签: python email airflow


    【解决方案1】:

    我在搜索如何让 Airflow 在成功后向我发送电子邮件时遇到了这个问题。

    Airflow 能够在 failure重试 时通过 default_args 中的以下内容发送电子邮件。

    default_args = {
        'email': ['some_email@gmail.com'],
        'email_on_failure': True,
        'email_on_retry': True,
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
     }
    

    这应该有助于解决您关于失败的部分问题,并且您可以使用您的“email_success”EmailOperator 作为序列中的最后一个任务。

    【讨论】:

    • 所以这个想法是无论如何都会发送一封电子邮件。如果 dag 在最后一个运算符(即 email_success 运算符)之前的一步失败,它将发送一封说明失败的电子邮件,然后在到达最后一步之前停止。但是如果它一直完成,那么 email_success 操作符将最后执行,然后不会发送失败电子邮件?
    • 另外,如果 DAG 失败,是否有办法根据用户输入(例如 API 调用或配置文件)修改电子邮件列表?
    • 是的,完全正确。您可以通过在 default_args 和成功 EmailOperator 'to=' 中指定不同的电子邮件地址,将失败电子邮件发送到一个电子邮件帐户并将成功电子邮件发送到另一个帐户。关于,根据代码本身出现的情况修改电子邮件列表 - 你需要更复杂的东西。
    • 好的,谢谢。是否还有可以触发的操作符,以便修改失败邮件?
    • 不,我不这么认为。这是一般的失败电子邮件。我认为您需要为您正在寻找的条件创建特定的 EmailOperators。我不确定。
    【解决方案2】:

    我认为您可以使用BranchOperator 来决定发送有关失败或成功的电子邮件。我有同样的情况,而我发送一封失败的电子邮件,如果成功则运行一个 DummyOperator。

    您可以查看有关 branchOperator How does Airflow's BranchPythonOperator work? 的相关问题

    【讨论】:

    • 如果 DAG 在分支运算符之前的一步失败,是否意味着不会发送电子邮件?
    • 嗯,这将取决于您的 trigger_rule。对于我的示例,如果我的 dag 至少有一个失败的任务(trigger_rule='one_fail'),它将发送一封电子邮件,否则它不会并且将跳过 branchingOperator 上的'send_email' 任务。当它变成粉红色时,您可以看到它跳过它。
    【解决方案3】:

    我猜你要找的是Trigger Rules

    您可以将无故障时应发送的电子邮件的触发规则设置为all_success(默认),将故障时应发送的电子邮件的触发规则设置为all_failed

    【讨论】:

      猜你喜欢
      • 2022-06-17
      • 2021-09-17
      • 1970-01-01
      • 2019-02-05
      • 1970-01-01
      • 2020-01-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多