我能够使用控制我的主 DAG 进程的 time_check DAG 来实现要求。在这里,我在下午 12 点触发主 DAG,并在凌晨 1 点使用 time_check DAG 将其关闭。
time_check DAG 代码分享如下:
from datetime import timedelta, datetime,timezone,date
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.models import Variable
default_args = {
'owner': 'owner',
'depends_on_past': False,
'start_date': datetime(2021,3,2,10,1),
'email': ['xxx@xxx.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
airflow_variable = 'stream_dag_status'
def check_current_time(**context):
now_utc = datetime.now(timezone.utc)
start_utc = now_utc.replace(hour=12, minute=0, second=0,microsecond=0)
end_utc = now_utc.replace(hour=1, minute=0, second=0,microsecond=0)+ timedelta(days=1)
if now_utc >= start_utc and now_utc < end_utc:
Variable.set(airflow_variable, 'START')
start_stream = 'start_stream'
return start_stream
else:
update_variable = 'update_variable'
return update_variable
def set_airflow_variable(**context):
Variable.set(airflow_variable, 'STOP')
with DAG('time_check', schedule_interval='0 12,1 * * *', max_active_runs=1, catchup=False,
default_args=default_args) as dag:
check_current_time = BranchPythonOperator(task_id='check_current_time', python_callable=check_current_time,
provide_context=True,
dag=dag)
start_stream = TriggerDagRunOperator(
task_id='start_stream',
trigger_dag_id='STREAMING_TEST',
dag=dag)
update_variable = PythonOperator(task_id='update_variable', python_callable=set_airflow_variable,
provide_context=True,
dag=dag)
stop_stream_email = EmailOperator(task_id='stop_stream_email', to='xxx@xxx.com',
subject='Streaming DAG is OFF now',
html_content="<p>Hi,<br><br>Turning streaming DAG to OFF state<br>", dag=dag)
check_current_time >> start_stream
check_current_time >> update_variable >> stop_stream_email
下面分享自触发DAG代码:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.models import Variable
default_args = {
'owner': 'owner',
'depends_on_past': False,
'start_date': datetime(2021,3,2,10,1),
'email': ['xxx@xxx.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
airflow_variable = 'stream_dag_status'
def check_airflow_variable(**context):
status = Variable.get(airflow_variable)
if status == 'START':
sleep_task = 'sleep_task'
return sleep_task
else:
send_email = 'email_notify'
return send_email
with DAG('STREAMING_TEST', schedule_interval=None, max_active_runs=1, catchup=False, default_args=default_args) as dag:
check_airflow_variable = BranchPythonOperator(task_id='check_airflow_variable', python_callable=check_airflow_variable,
provide_context=True,
dag=dag)
sleep_task = BashOperator(
task_id='sleep_task',
bash_command='sleep 60',
dag=dag,
)
start_group = DummyOperator(task_id='start_split', depends_on_past=False)
dag_trigger = TriggerDagRunOperator(
task_id='trigger_self',
trigger_dag_id='STREAMING_TEST',
dag=dag)
email_notify = EmailOperator(task_id='email_notify', to='xxx@xxx@tegna.com',
subject='Variable value is STOP',
html_content="<p>Hi,<br><br>Streaming is stopped<br>", dag=dag)
check_airflow_variable >> sleep_task >> start_group >> dag_trigger
check_airflow_variable >> email_notify