【问题标题】:schedule a monthly DAG to run on the next weekday安排每月 DAG 在下一个工作日运行
【发布时间】:2021-03-17 01:06:17
【问题描述】:

我必须安排一个 DAG,它应该在每个月的 15 日运行。但是,如果 15 日是周日/周六,那么 DAG 应该跳过周末并在接下来的周一运行。

例如,2021 年 5 月 15 日是星期六。因此,DAG 应该在 17 日(星期一)运行,而不是在 5 月 15 日运行。

你能帮忙安排一下气流吗?

提前致谢!

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:

    调度的逻辑受限于你可以用单个 cron 表达式做什么。所以如果你不能在 cron 表达式中说出来,你就不能在 Airflow 中提供这样的调度。出于这个原因,有一个开放的气流改进提案AIP-39 Richer scheduler_interval 以提供更多的调度功能。

    也就是说,您仍然可以通过编写一些代码来获得所需的功能。 您可以将您的 dag 设置为每月 15 日开始,然后放置一个传感器来验证日期是周一至周五(如果不是,它将等待):

    from airflow.sensors.weekday import DayOfWeekSensor
    dag = DAG(
        dag_id='work',
        schedule_interval='0 0 15 * *',
        default_args=default_args,
        description='Schedule a Job on 15 of each month',
    )
    
    weekend_check = DayOfWeekSensor(
        task_id='weekday_check_task',
        week_day={'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'},
        mode='reschedule',
        dag=dag)
    
    op_1 = YourOperator(task_id='op1_task',dag=dag)
    
    weekend_check >> op_1
    

    注意:如果您正在运行airflow<2.0.0,则需要将导入更改为:

    from airflow.contrib.sensors.weekday_sensor import DayOfWeekSensor
    

    【讨论】:

    • 这是一个智能工作解决方案。感谢您抽出时间来解决我的问题。
    【解决方案2】:

    Elad 发布的答案效果很好。我想出了另一个同样有效的解决方案。

    我将作业安排在本月的 15、16 和 17 日运行。但是,我添加了一个条件,以便作业在工作日的 15 日运行。如果是星期一,该作业将在 16 日和 17 日运行。

    为此,我添加了一个 BranchPythonOperator:

    from airflow.operators.python_operator import BranchPythonOperator
    
    def _conditinal_task_initiator(**kwargs):
    execution_date=kwargs['execution_date']
    if int(datetime.strftime(execution_date,'%d'))==15 and (execution_date.weekday()<5):
        return 'dummy_task_run_cmo_longit'
    elif int(datetime.strftime(execution_date,'%d'))==16 and (execution_date.weekday()==0):
        return 'dummy_task_run_cmo_longit'
    elif int(datetime.strftime(execution_date,'%d'))==17 and (execution_date.weekday()==0):
        return 'dummy_task_run_cmo_longit'
    else:
        return 'dummy_task_skip_cmo_longit'
    
    with DAG(dag_id='NXS_FM_LOAD_CMO_CHOICE_LONGIT',default_args = default_args, schedule_interval = "0 8 15-17 * *", catchup=False) as dag:
    conditinal_task_initiator=BranchPythonOperator(
        task_id='cond_task_check_day',
        provide_context=True,
        python_callable=_conditinal_task_initiator,
        do_xcom_push=False)
    dummy_task_run_cmo_longit=DummyOperator(
        task_id='dummy_task_run_cmo_longit')
    dummy_task_skip_cmo_longit=DummyOperator(
        task_id='dummy_task_skip_cmo_longit')
    
    conditinal_task_initiator >> [dummy_task_run_cmo_longit,dummy_task_skip_cmo_longit]
    
    dummy_task_run_cmo_longit >> <main tasks for execution>
    

    使用这个,作业将在每个月的 15,16 和 17 运行。但是,它每月只运行一次功能任务。

    【讨论】:

    • 你可能想看看我最近添加的 BranchDayOfWeekOperator:github.com/apache/airflow/pull/13997
    • @Elad,我正在使用 apache 气流 1.10.12。此版本中不存在 BranchDayOfWeekOperator。看起来这门课会更优雅地解决我的问题。
    • 您可以随时将运算符复制到您的代码中并使用它。如果以及何时升级,您可以删除代码并将其替换为从气流核心导入。我只是建议一个选项。
    猜你喜欢
    • 1970-01-01
    • 2014-11-09
    • 2020-12-07
    • 1970-01-01
    • 1970-01-01
    • 2011-03-15
    • 2021-10-03
    相关资源
    最近更新 更多