【问题标题】:Run code before all Airflow DAGs在所有 Airflow DAG 之前运行代码
【发布时间】:2017-08-21 20:58:51
【问题描述】:

我正在使用相对较新的airflow 项目。我编写并运行了一堆 DAG。现在我想集成一个错误报告服务,这样如果任何 DAG 中的任何代码引发异常,信息都会发送到某个 API。我可以将 API 调用放在每个 DAG 的 on_failure_callback 中,但我需要执行像 bug_reporter.init(bug_reporter_token) 这样只需要运行一次的初始化行。

Airflow 中有用于初始化代码的地方吗?现在我在每个 DAG 定义文件的开头初始化错误跟踪器。这似乎是多余的,但我找不到在定义 DAG 之前编写运行文件的地方。我已经尝试阅读有关 plugins 的信息,但它似乎不存在。

【问题讨论】:

    标签: airflow


    【解决方案1】:

    在您的 DAG 定义文件中,使用您自己的子类代替 DAG:

    from airflow.utils.decorators import apply_defaults
    import bug_reporter
    
    class DAGWithBugReporter(DAG):
        @apply_defaults
        def __init__(
            self,
            bug_reporter_token,
            *args, **kwargs):
    
            super(DAGWithBugReporter, self).__init__(*args, **kwargs)
            bug_reporter.init(bug_reporter_token)
    

    然后在你的 dag 定义中:

    dag = DAGWithBugReporter(
        dag_id='my_dag',
        schedule_interval=None,
        start_date=datetime(2017, 2, 26),
        bug_reporter_token=my_token_from_somewhere
    )
    
    
    t1 = PythonOperator(
        task_id='t1',
        provide_context=True,
        python_callable=my_callable,
        xcom_push=True,
        dag=dag)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-08-12
      • 2018-07-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多