【发布时间】:2021-07-27 14:06:50
【问题描述】:
我是气流新手,所以在这里我有一个疑问。
如果满足第一个任务的条件,我想运行 DAG。如果条件不满足,我想在第一个任务之后停止 dag。
例子:
# first task
def get_number_func(**kwargs):
number = randint(0, 10)
print(number)
if (number >= 5):
print('A')
return 'continue_task'
else:
#STOP DAG
# second task if number is higher or equal 5
def continue_func(**kwargs):
print("The number is " + str(number))
# first task declaration
start_op = BranchPythonOperator(
task_id='get_number',
provide_context=True,
python_callable=get_number_func,
op_kwargs={},
dag=DAG,
)
# second task declaration
continue_op = PythonOperator(
task_id='continue_task',
provide_context=True,
python_callable=continue_func,
op_kwargs={},
dag=DAG,
)
start_op >> continue_op
如果满足 number 的条件,我只运行第二个任务。如果条件未验证,则 DAG 不应运行第二个任务。
我该怎么做?我不想使用 xcom、全局变量或虚拟任务。
提前致谢!
【问题讨论】:
-
请通过停止 DAG 来定义您的意思。你指的是具体的运行吗?您的意思是成功完成工作流程吗?