【问题标题】:crontab expression to schedule a self triggering DAG in Apache Airflow用于在 Apache Airflow 中安排自触发 DAG 的 crontab 表达式
【发布时间】:2021-06-09 04:49:09
【问题描述】:

我有一个 DAG,它有一个 triggerdagrunoperator 来自触发同一个 DAG。 DAG 代码在下面共享。

from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator


default_args = {
    'owner': 'ownername',
    'depends_on_past': False,
    'start_date': datetime(2021,3,2,10,1),
    'email': [***@mail.com],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
with DAG('self_trigger_dag', schedule_interval=None, max_active_runs=1, catchup=False, default_args=default_args) as dag:
    sleep_task = BashOperator(
        task_id='sleep_task',
        bash_command='sleep 180',
        dag=dag,
    )

    bash_command =BashOperator(
        task_id='run_command',
        bash_command="my bash_command",
        use_legacy_sql=False,
        dag=dag,
    )
    
    dag_trigger = TriggerDagRunOperator(
        task_id='trigger_self',
        trigger_dag_id='self_trigger_dag',
        dag=dag)

    sleep_task >> bash_command >> dag_trigger

要求 DAG 应安排在上午 8 点到晚上 9 点之间。我不能给出像 '* 8-21 * * *' 这样的表达式,因为这是一个自触发 DAG。请帮助我使用正确的 crontab 表达式或任何其他替代方法。

提前致谢。

【问题讨论】:

    标签: cron airflow-scheduler directed-acyclic-graphs


    【解决方案1】:

    我能够使用控制我的主 DAG 进程的 time_check DAG 来实现要求。在这里,我在下午 12 点触发主 DAG,并在凌晨 1 点使用 time_check DAG 将其关闭。

    time_check DAG 代码分享如下:

    from datetime import timedelta, datetime,timezone,date
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.python_operator import BranchPythonOperator
    from airflow.operators.email_operator import EmailOperator
    from airflow.operators.dagrun_operator import TriggerDagRunOperator
    from airflow.models import Variable
    
    
    default_args = {
        'owner': 'owner',
        'depends_on_past': False,
        'start_date': datetime(2021,3,2,10,1),
        'email': ['xxx@xxx.com'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    }
    
    airflow_variable = 'stream_dag_status'
    
    
    def check_current_time(**context):
        now_utc = datetime.now(timezone.utc)
        start_utc = now_utc.replace(hour=12, minute=0, second=0,microsecond=0)
        end_utc = now_utc.replace(hour=1, minute=0, second=0,microsecond=0)+ timedelta(days=1)
    
        if now_utc >= start_utc and now_utc < end_utc:
            Variable.set(airflow_variable, 'START')
            start_stream = 'start_stream'
            return start_stream
        else:
            update_variable = 'update_variable'
            return update_variable
    
    def set_airflow_variable(**context):
        Variable.set(airflow_variable, 'STOP')
    
    with DAG('time_check', schedule_interval='0 12,1 * * *', max_active_runs=1, catchup=False,
             default_args=default_args) as dag:
        check_current_time = BranchPythonOperator(task_id='check_current_time', python_callable=check_current_time,
                                        provide_context=True,
                                        dag=dag)
    
        start_stream = TriggerDagRunOperator(
            task_id='start_stream',
            trigger_dag_id='STREAMING_TEST',
            dag=dag)
    
        update_variable = PythonOperator(task_id='update_variable', python_callable=set_airflow_variable,
                                        provide_context=True,
                                        dag=dag)
    
        stop_stream_email = EmailOperator(task_id='stop_stream_email', to='xxx@xxx.com',
                                          subject='Streaming DAG is OFF now',
                                          html_content="<p>Hi,<br><br>Turning streaming DAG to OFF state<br>", dag=dag)
    
        check_current_time >> start_stream
        check_current_time >> update_variable >> stop_stream_email
    

    下面分享自触发DAG代码:

    from datetime import timedelta, datetime
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.dagrun_operator import TriggerDagRunOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import BranchPythonOperator
    from airflow.operators.email_operator import EmailOperator
    from airflow.models import Variable
    
    default_args = {
        'owner': 'owner',
        'depends_on_past': False,
        'start_date': datetime(2021,3,2,10,1),
        'email': ['xxx@xxx.com'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    }
    airflow_variable = 'stream_dag_status'
    
    def check_airflow_variable(**context):
        status = Variable.get(airflow_variable)
        if status == 'START':
            sleep_task = 'sleep_task'
            return sleep_task
        else:
            send_email = 'email_notify'
            return send_email
            
    
    
    with DAG('STREAMING_TEST', schedule_interval=None, max_active_runs=1, catchup=False, default_args=default_args) as dag:
        
        check_airflow_variable = BranchPythonOperator(task_id='check_airflow_variable', python_callable=check_airflow_variable,
                                        provide_context=True,
                                        dag=dag)    
        sleep_task = BashOperator(
            task_id='sleep_task',
            bash_command='sleep 60',
            dag=dag,
        )
        start_group = DummyOperator(task_id='start_split', depends_on_past=False)
    
        dag_trigger = TriggerDagRunOperator(
            task_id='trigger_self',
            trigger_dag_id='STREAMING_TEST',
            dag=dag)
    
        email_notify = EmailOperator(task_id='email_notify', to='xxx@xxx@tegna.com',
                                          subject='Variable value is STOP',
                                          html_content="<p>Hi,<br><br>Streaming is stopped<br>", dag=dag)
    
        check_airflow_variable >> sleep_task >> start_group >> dag_trigger
        check_airflow_variable >> email_notify
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-11-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-08
      • 2018-01-16
      • 2020-08-14
      相关资源
      最近更新 更多