【问题标题】:Apache Airflow DAG doesn't call on_success_callback and on_failure_callbackApache Airflow DAG 不调用 on_success_callback 和 on_failure_callback
【发布时间】:2019-09-13 20:35:15
【问题描述】:

我想自定义我的 DAG 以在电子邮件失败或成功时发送电子邮件。我正在尝试在 DAG 构造函数中使用 on_success_callback 和 on_failure_callback,但它不适用于 DAG。同时它适用于我放入 DAG 中的 DummyOperator。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

from utils import get_report_operator, DagStatus


TEST_DAG_NAME='test_dag'
TEST_DAG_REPORT_SUBSCRIBERS = ['MY_EMAIL']

def send_success_report(context):
    subject = 'Airflow report: {0} run success'.format(TEST_DAG_NAME)
    email_operator = get_report_operator(subject, TEST_DAG_REPORT_SUBSCRIBERS, TEST_DAG_NAME, DagStatus.SUCCESS)
    email_operator.execute(context)

def send_failed_report(context):
    subject = 'Airflow report: {0} run failed'.format(TEST_DAG_NAME)
    email_operator = get_report_operator(subject, TEST_DAG_REPORT_SUBSCRIBERS, TEST_DAG_NAME, DagStatus.FAILED)
    email_operator.execute(context)


dag = DAG(dag_id=TEST_DAG_NAME,
          schedule_interval=None,
          start_date=datetime(2019,6,6),
          on_success_callback=send_success_report,
          on_failure_callback=send_failed_report)

DummyOperator(task_id='task',
              on_success_callback=send_success_report,
              on_failure_callback=send_failed_report,
              dag = dag)

我还在 Airflow EmailOperator 上实现了一些插件来发送报告。我不认为这部分有这个错误,但仍然如此。

class DagStatus(Enum):
    SUCCESS = 0
    FAILED = 1


def get_report_operator(sbjct, to_lst, dag_id, dag_status):
    status = 'SUCCESS' if dag_status == DagStatus.SUCCESS else 'FAILED'
    status_color = '#87C540' if dag_status == DagStatus.SUCCESS else '#FF1717'
    with open(os.path.join(os.path.dirname(__file__), 'airflow_report.html'), 'r', encoding='utf-8') as report_file:
        report_mask = report_file.read()
    report_text = report_mask.format(dag_id, status, status_color)
    tmp_dag = DAG(dag_id='tmp_dag', start_date=datetime(year=2019, month=9, day=12), schedule_interval=None)
    return EmailOperator(task_id='send_email',
                    to=to_lst,
                    subject=sbjct,
                    html_content=report_text.encode('utf-8'),
                    dag = tmp_dag)

我做错了什么?

【问题讨论】:

    标签: python callback airflow


    【解决方案1】:

    而是将on_failure_callback 作为default_args 字典中的参数并将其传递给DAG。 defaut_args 中传递给 DAG 的所有参数都将应用于 DAG 的所有运算符。到目前为止,这是对 DAG 中的所有运算符应用公共参数的唯一方法。

    dag = DAG(dag_id=TEST_DAG_NAME,      
              schedule_interval=None,
              start_date=datetime(2019,6,6),
              default_args={
                  'on_success_callback': send_success_report,
                  'on_failure_callback': send_failed_report
              })
    

    【讨论】:

    • 如果有升级到 Airflow 2.0 的计划,请务必避免在 default_args 中使用不适用于所有 Operator 的参数;否则,版本 2.0+ 将导致抛出错误。除非您更改 airflow.cfg 文件中的特定设置。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-13
    • 1970-01-01
    • 2018-07-01
    • 2021-06-28
    • 2022-12-04
    • 1970-01-01
    相关资源
    最近更新 更多