【问题标题】:How to Mute Failure for a Specific Airflow Task?如何为特定的气流任务静音失败?
【发布时间】:2018-01-23 20:02:55
【问题描述】:

我在 here 描述的气流中设置了一个条件任务。它所做的只是检查配置单元分区是否存在。如果是,则继续其余任务,如果不是,请先添加分区,然后再继续。条件检查任务可以失败也可以成功,两者都可以。但是,我为 dag 设置了寻呼机职责电子邮件通知,因为我想知道下游任务何时失败。如何在该特定条件任务上静音失败通知,以免我在寻呼任务时收到误报?

【问题讨论】:

  • 如何触发 PagerDuty 警报?如果它与on_failure_callbackemail_on_failure 一起使用,您应该可以将它们设置为None 以完成该特定任务。
  • 它使用 email_on_failure,作为 default_args 的一部分。 dag=dag 适用于所有任务。如果我将其设置为 None,我将无法在同一 dag 中找出其他任务失败。
  • 是的,所以您应该能够将其设置在default_args 中,以便您的所有其他任务都有警报,然后在您定义一个任务时覆盖它,即BashOperator(task_id='foo', dag=dag, on_failure_callback=None)
  • 它不起作用
  • 抱歉,您尝试了BashOperator(task_id='foo', dag=dag, email_on_failure =None)?抱歉 on_failure_callback 是我上次评论中的错字。

标签: python airflow pagerduty


【解决方案1】:

email_on_failureon_failure_callback 等是任务(操作员)级别的参数。它们继承自 DAG 对象,您传递给 DAG 的值为 default_args,但您也可以在初始化时覆盖它们。

YourOperator(task_id='task1', dag=dag, email_on_failure=None, on_failure_callback=None, ...)

这里是当任务失败时气流如何处理这些回调的源代码,让你更清楚它是如何工作的。

def handle_failure(self, error, test_mode=False, context=None):
        self.log.exception(error)
        task = self.task
        session = settings.Session()
        self.end_date = datetime.utcnow()
        self.set_duration()
        Stats.incr('operator_failures_{}'.format(task.__class__.__name__), 1, 1)
        Stats.incr('ti_failures')
        if not test_mode:
            session.add(Log(State.FAILED, self))

        # Log failure duration
        session.add(TaskFail(task, self.execution_date, self.start_date, self.end_date))

        # Let's go deeper
        try:
            # Since this function is called only when the TI state is running,
            # try_number contains the current try_number (not the next). We
            # only mark task instance as FAILED if the next task instance
            # try_number exceeds the max_tries.
            if task.retries and self.try_number <= self.max_tries:
                self.state = State.UP_FOR_RETRY
                self.log.info('Marking task as UP_FOR_RETRY')
                if task.email_on_retry and task.email:
                    self.email_alert(error, is_retry=True)
            else:
                self.state = State.FAILED
                if task.retries:
                    self.log.info('All retries failed; marking task as FAILED')
                else:
                    self.log.info('Marking task as FAILED.')
                if task.email_on_failure and task.email:
                    self.email_alert(error, is_retry=False)
        except Exception as e2:
            self.log.error('Failed to send email to: %s', task.email)
            self.log.exception(e2)

        # Handling callbacks pessimistically
        try:
            if self.state == State.UP_FOR_RETRY and task.on_retry_callback:
                task.on_retry_callback(context)
            if self.state == State.FAILED and task.on_failure_callback:
                task.on_failure_callback(context)
        except Exception as e3:
            self.log.error("Failed at executing callback")
            self.log.exception(e3)

        if not test_mode:
            session.merge(self)
        session.commit()
        self.log.error(str(error))

https://airflow.apache.org/_modules/airflow/models.html#BaseOperator

【讨论】:

    猜你喜欢
    • 2017-08-24
    • 1970-01-01
    • 1970-01-01
    • 2023-03-30
    • 2020-11-14
    • 1970-01-01
    • 1970-01-01
    • 2021-05-06
    • 1970-01-01
    相关资源
    最近更新 更多