【问题标题】:Airflow - Stop DAG based on condition (skip remaining tasks after branch)气流 - 根据条件停止 DAG(分支后跳过剩余任务)
【发布时间】:2021-07-27 14:06:50
【问题描述】:

我是气流新手,所以在这里我有一个疑问。

如果满足第一个任务的条件,我想运行 DAG。如果条件不满足,我想在第一个任务之后停止 dag。

例子:

# first task
def get_number_func(**kwargs):

    number = randint(0, 10)
    print(number)
    
    if (number >= 5):
        print('A')
        return 'continue_task'
    else:
        #STOP DAG
        
# second task if number is higher or equal 5
def continue_func(**kwargs):
    print("The number is " + str(number))
    
# first task declaration
start_op = BranchPythonOperator(
    task_id='get_number',
    provide_context=True,
    python_callable=get_number_func,
    op_kwargs={},
    dag=DAG,
)

# second task declaration
continue_op = PythonOperator(
    task_id='continue_task',
    provide_context=True,
    python_callable=continue_func,
    op_kwargs={},
    dag=DAG,
)

start_op  >> continue_op 

如果满足 number 的条件,我只运行第二个任务。如果条件未验证,则 DAG 不应运行第二个任务。

我该怎么做?我不想使用 xcom、全局变量或虚拟任务。

提前致谢!

【问题讨论】:

  • 请通过停止 DAG 来定义您的意思。你指的是具体的运行吗?您的意思是成功完成工作流程吗?

标签: python bigdata airflow


【解决方案1】:

您查看过ShortCircuitOperator 吗?此任务根据条件是 True 还是 False 来控制您的任务流。如果条件为 True,则下游任务将继续。否则,将跳过所有下游任务。尝试将您的第一个任务更改为 ShortCircuitOperator 并更新 get_number_func 函数以返回 True 或 False。

这是我使用您的代码进行的测试:

from airflow.decorators import dag, task
from airflow.models import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator

from datetime import datetime


default_args = dict(
    start_date=datetime(2021, 4, 26),
    owner="me",
    retries=0,
)

dag_args = dict(
    dag_id="short_circuit",
    schedule_interval=None,
    default_args=default_args,
    catchup=False,
)


def get_number_func(**kwargs):
    from random import randint

    number = randint(0, 10)
    print(number)

    if number >= 5:
        print("A")
        return True
    else:
        # STOP DAG
        return False


def continue_func(**kwargs):
    pass


with DAG(**dag_args) as dag:
    # first task declaration
    start_op = ShortCircuitOperator(
        task_id="get_number",
        provide_context=True,
        python_callable=get_number_func,
        op_kwargs={},
    )

    # second task declaration
    continue_op = PythonOperator(
        task_id="continue_task",
        provide_context=True,
        python_callable=continue_func,
        op_kwargs={},
    )

    start_op >> continue_op

【讨论】:

  • 嗨乔希,这正是我要找的。非常感谢!上帝的工作!
  • 不客气!你介意选择这个作为接受的答案吗?将来会对其他人有所帮助。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-08-05
  • 1970-01-01
  • 1970-01-01
  • 2017-01-01
  • 1970-01-01
  • 2017-06-19
  • 1970-01-01
相关资源
最近更新 更多