【发布时间】:2021-01-21 18:41:09
【问题描述】:
我创建了两个要手动触发的 dag schedule_interval=None。
首先,我触发了“External_test_A”,它应该处于挂起状态,直到“External_test_B”不会被触发。过了一会儿(2分钟后)我触发了“External_test_B”DAG,它只运行一个任务first_task。
当任务:first_task 成功,则来自“External_test_A”的Poking for External_test_B.first_task 应该完成,返回成功并运行来自“External_test_A”的second_task 任务。
我陷入了这样一种情况,即使first_task 成功,来自“External_test_A”的Poking for ... 仍在继续。
External_test_A:
default_args = {
'owner': 'airflow',
'start_date': pendulum.yesterday().astimezone('US/Eastern')
}
dag = DAG(
dag_id='External_test_A',
default_args=default_args,
schedule_interval=None
)
def do_second_task():
print('Second task is done')
sensor = ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='External_test_B',
external_task_id='first_task',
execution_delta=timedelta(minutes=3),
dag=dag)
t2 = PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
sensor >> t2
if __name__ == "__main__":
dag.cli()
External_test_B:
default_args = {
'owner': 'airflow',
'start_date': pendulum.yesterday().astimezone('US/Eastern')
}
dag = DAG(
dag_id='External_test_B',
default_args=default_args,
schedule_interval=None
)
t1 = DummyOperator(task_id='first_task', dag=dag)
t1
if __name__ == "__main__":
dag.cli()
你们中的一些人能告诉我我做错了什么吗?如何仅使用手动触发解决来自两个不同 DAG 的两个任务之间的通信问题?
【问题讨论】:
-
看起来你不能手动触发子 dag,根据luminousmen.com/post/airflow-dag-dependencies