【问题标题】:How to mark an Airflow DAG run as failed if any task fails?如果任何任务失败,如何将 Airflow DAG 运行标记为失败?
【发布时间】:2018-04-27 05:05:53
【问题描述】:

如果任何任务失败,是否有可能使 Airflow DAG 失败?

我通常在 DAG 结束时有一些清理任务,就像现在一样,只要最后一个任务成功,整个 DAG 就会被标记为成功。

【问题讨论】:

  • 这可能是相关的,但并不相同,在我的情况下,任务已执行但整个 dag 被标记为成功而不是失败
  • 我也有同样的问题。我认为这是一个错误。

标签: airflow


【解决方案1】:

另一种解决方案是添加一个最终的 PythonOperator,用于检查本次运行中所有任务的状态:

final_status = PythonOperator(
    task_id='final_status',
    provide_context=True,
    python_callable=final_status,
    trigger_rule=TriggerRule.ALL_DONE, # Ensures this task runs even if upstream fails
    dag=dag,
)

def final_status(**kwargs):
    for task_instance in kwargs['dag_run'].get_task_instances():
        if task_instance.current_state() != State.SUCCESS and \
                task_instance.task_id != kwargs['task_instance'].task_id:
            raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))

【讨论】:

  • 我更喜欢这个解决方案,因为我可以简单地编写这个运算符一次,然后简单地将 is 放在任何 DAG 的末尾,而不需要更改任何运算符。
【解决方案2】:

面临类似的问题。这不是错误,但将此属性添加到 Dag 可能是一个不错的功能。

作为一种解决方法,我可以在允许失败的任务期间推送 XCOM 变量,并在下游任务中执行类似的操作

if ti.xcom_pull(key='state', task_ids=task_allowed_to_fail_id) == 'FAILED': raise ValueError('Force failure because upstream task has failed')

【讨论】:

    猜你喜欢
    • 2017-06-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-18
    • 1970-01-01
    • 1970-01-01
    • 2021-06-05
    相关资源
    最近更新 更多