【发布时间】:2021-03-17 01:06:17
【问题描述】:
我必须安排一个 DAG,它应该在每个月的 15 日运行。但是,如果 15 日是周日/周六,那么 DAG 应该跳过周末并在接下来的周一运行。
例如,2021 年 5 月 15 日是星期六。因此,DAG 应该在 17 日(星期一)运行,而不是在 5 月 15 日运行。
你能帮忙安排一下气流吗?
提前致谢!
【问题讨论】:
我必须安排一个 DAG,它应该在每个月的 15 日运行。但是,如果 15 日是周日/周六,那么 DAG 应该跳过周末并在接下来的周一运行。
例如,2021 年 5 月 15 日是星期六。因此,DAG 应该在 17 日(星期一)运行,而不是在 5 月 15 日运行。
你能帮忙安排一下气流吗?
提前致谢!
【问题讨论】:
调度的逻辑受限于你可以用单个 cron 表达式做什么。所以如果你不能在 cron 表达式中说出来,你就不能在 Airflow 中提供这样的调度。出于这个原因,有一个开放的气流改进提案AIP-39 Richer scheduler_interval 以提供更多的调度功能。
也就是说,您仍然可以通过编写一些代码来获得所需的功能。 您可以将您的 dag 设置为每月 15 日开始,然后放置一个传感器来验证日期是周一至周五(如果不是,它将等待):
from airflow.sensors.weekday import DayOfWeekSensor
dag = DAG(
dag_id='work',
schedule_interval='0 0 15 * *',
default_args=default_args,
description='Schedule a Job on 15 of each month',
)
weekend_check = DayOfWeekSensor(
task_id='weekday_check_task',
week_day={'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'},
mode='reschedule',
dag=dag)
op_1 = YourOperator(task_id='op1_task',dag=dag)
weekend_check >> op_1
注意:如果您正在运行airflow<2.0.0,则需要将导入更改为:
from airflow.contrib.sensors.weekday_sensor import DayOfWeekSensor
【讨论】:
Elad 发布的答案效果很好。我想出了另一个同样有效的解决方案。
我将作业安排在本月的 15、16 和 17 日运行。但是,我添加了一个条件,以便作业在工作日的 15 日运行。如果是星期一,该作业将在 16 日和 17 日运行。
为此,我添加了一个 BranchPythonOperator:
from airflow.operators.python_operator import BranchPythonOperator
def _conditinal_task_initiator(**kwargs):
execution_date=kwargs['execution_date']
if int(datetime.strftime(execution_date,'%d'))==15 and (execution_date.weekday()<5):
return 'dummy_task_run_cmo_longit'
elif int(datetime.strftime(execution_date,'%d'))==16 and (execution_date.weekday()==0):
return 'dummy_task_run_cmo_longit'
elif int(datetime.strftime(execution_date,'%d'))==17 and (execution_date.weekday()==0):
return 'dummy_task_run_cmo_longit'
else:
return 'dummy_task_skip_cmo_longit'
with DAG(dag_id='NXS_FM_LOAD_CMO_CHOICE_LONGIT',default_args = default_args, schedule_interval = "0 8 15-17 * *", catchup=False) as dag:
conditinal_task_initiator=BranchPythonOperator(
task_id='cond_task_check_day',
provide_context=True,
python_callable=_conditinal_task_initiator,
do_xcom_push=False)
dummy_task_run_cmo_longit=DummyOperator(
task_id='dummy_task_run_cmo_longit')
dummy_task_skip_cmo_longit=DummyOperator(
task_id='dummy_task_skip_cmo_longit')
conditinal_task_initiator >> [dummy_task_run_cmo_longit,dummy_task_skip_cmo_longit]
dummy_task_run_cmo_longit >> <main tasks for execution>
使用这个,作业将在每个月的 15,16 和 17 运行。但是,它每月只运行一次功能任务。
【讨论】: