【发布时间】:2022-02-22 22:56:39
【问题描述】:
我有一个循环来创建任务的列表。该列表的大小是静态的。
for counter, account_id in enumerate(ACCOUNT_LIST):
task_id = f"bash_task_{counter}"
if account_id:
trigger_task = BashOperator(
task_id=task_id,
bash_command="echo hello there",
dag=dag)
else:
trigger_task = BashOperator(
task_id=task_id,
bash_command="echo hello there",
dag=dag)
trigger_task.status = SKIPPED # is there way to somehow set status of this to skipped instead of having a branch operator?
trigger_task
我手动尝试过,但无法跳过任务:
start = DummyOperator(task_id='start')
task1 = DummyOperator(task_id='task_1')
task2 = DummyOperator(task_id='task_2')
task3 = DummyOperator(task_id='task_3')
task4 = DummyOperator(task_id='task_4')
start >> task1
start >> task2
try:
start >> task3
raise AirflowSkipException
except AirflowSkipException as ase:
log.error('Task Skipped for task3')
try:
start >> task4
raise AirflowSkipException
except AirflowSkipException as ase:
log.error('Task Skipped for task4')
【问题讨论】:
-
@alltej 你把它弄糊涂了;
AirflowSkipException必须从您的操作员代码中提出(而不是像您在此处所做的那样在您的DAG定义代码中提出)。您在这里尝试做的事情尚不清楚;但也是不可能的(你不能在 DAG 定义期间标记任务的状态,因为它还没有运行)。要以预定义的方式跳过任务,您可以 [1] (更容易) 创建并有条件地将它们连接在一起 [2] 或使用BranchPythonOperator/ShortCircuitOperator -
你有代码示例吗?
-
我试图避免使用 BranchOperator 因为它看起来太过分了
标签: airflow