【问题标题】:Kill downstream task depending on upstream task in airflow根据气流中的上游任务杀死下游任务
【发布时间】:2021-09-07 20:43:19
【问题描述】:

我有一个 DAG,其中有 2 个任务。

t1=  PythonOperator(
    task_id='Check_Files_in_S3',
    provide_context=False,
    python_callable=checkFilesInS3,
    xcom_push=True,
    dag=dag)

t2 =  PythonOperator(
    task_id='snowflakeLoad',
    provide_context=True,
    python_callable=snowflakeLoad,
    xcom_push=True,
    dag=dag)

t1>>t2

第一个任务在 S3 中查找文件,如果文件可用,则任务应该是成功的,并且应该使用下游任务将数据加载到雪花中。

我的要求是,如果文件不可用,那么 task1 需要发送邮件,那么它应该停止下游任务(task2)。有没有办法实现这一点,比如返回 false 会使下游任务失败?如果早期的 DAG 运行失败,depends_on_past 也不会运行 DAG?或者如果其中一项任务在当前运行中失败,它不会运行下游任务?

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    你有很多问题,所以让我分解一下。

    有没有办法实现这一点,比如返回 false 会使下游任务失败?

    等待某事发生的通常方法是使用传感器(继承自 BaseSensorOperator)类。它们专门用于低资源消耗和易于配置以感知外部对象/资源。 但是,在您的情况下,这是一个检查并忘记(如果它不存在,请不要重试)。然后你可以坚持使用 PythonOperator。您只需引发一个异常,它就会被视为任务失败,从而阻止下游任务(在您的情况下为 t2)运行。

    如果之前的 DAG 运行失败,depends_on_past 也不会运行 DAG?或者如果其中一项任务在当前运行中失败,它不会运行下游任务?

    如果我们有两个任务,假设我们有两个 DAGRun。一个在时间点x,一个在时间点x + 1。如果您有depends_on_past=True,则x + 1 的任务t1 的运行将在x 时查看运行t1,并且只有在运行成功时才会开始。 这同样适用于t2x + 1,它将检查x + 1 的任务t1 是否已完成,然后检查t2 的时间x 是否成功。

    【讨论】:

    • 所以 X+1 运行中的 T2 不依赖于 X 运行中的 T1。 X+1 的 T2 也不依赖于 X+1 的 T1 运行。对吗?
    • 传感器用例是您想要等待某事发生的时候。 op 用例不是等待文件出现。该操作提到用例是在文件不存在时停止下游任务并发送电子邮件。
    • 埃拉德说得好。我没有考虑到这一点。感谢您纠正。
    • Yes t2 in x + 1 运行不检查 t1 in x 的状态。这是因为t2x 依赖于t1x 因为t2t1 的下游这一事实自动考虑在内。
    【解决方案2】:

    当没有文件时,您可以在checkFilesInS3 中提出AirflowSkipException。这将导致下游任务在引发异常之前也被跳过,您也可以发送电子邮件。

    from airflow import DAG
    from datetime import datetime
    from airflow.exceptions import AirflowSkipException
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import PythonOperator
    from airflow.utils.email import send_email
    
    
    def checkFilesInS3(**kwargs):
        your_code
        if no_files: #Raise exception to make downstream tasks skip
            send_email(to="someone@somewhere.com, subject="task failed!"")
            raise AirflowSkipException("No files found!")
    
    
    def snowflakeLoad(**kwargs):
        pass
    
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2021, 6, 23),
    
    }
    with DAG(dag_id='question',
             default_args=default_args,
             schedule_interval=None,
             catchup=False
             ) as dag:
    
        start = DummyOperator(task_id='start_task')
    
        t1 = PythonOperator(
            task_id='Check_Files_in_S3',
            python_callable=checkFilesInS3,
        )
    
        t2 = PythonOperator(
            task_id='snowflakeLoad',
            python_callable=snowflakeLoad,
        )
    
        start >> t1 >> t2
    

    【讨论】:

    • 谢谢@Elad。我不想将任务标记为失败。所以我根据文件的可用性使用 BranchPythonOperator。如果文件不可用,我将选择提出事件的任务。
    • 跳过不是失败。作为正常 ETL 流程的一部分,任务可以处于跳过状态。由于没有要处理的文件,因此只需跳过任务即可。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-05-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多