【问题标题】:Airflow ExternalTaskSensor manually triggeredAirflow ExternalTask​​Sensor 手动触发
【发布时间】:2021-01-21 18:41:09
【问题描述】:

我创建了两个要手动触发的 dag schedule_interval=None

首先,我触发了“External_test_A”,它应该处于挂起状态,直到“External_test_B”不会被触发。过了一会儿(2分钟后)我触发了“External_test_B”DAG,它只运行一个任务first_task

当任务:first_task 成功,则来自“External_test_A”的Poking for External_test_B.first_task 应该完成,返回成功并运行来自“External_test_A”的second_task 任务。

我陷入了这样一种情况,即使first_task 成功,来自“External_test_A”的Poking for ... 仍在继续。

External_test_A:

default_args = {
    'owner': 'airflow',
    'start_date': pendulum.yesterday().astimezone('US/Eastern')
}

dag = DAG(
    dag_id='External_test_A',
    default_args=default_args,
    schedule_interval=None
)

def do_second_task():
    print('Second task is done')

sensor = ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='External_test_B',
    external_task_id='first_task',
    execution_delta=timedelta(minutes=3),
    dag=dag)

t2 = PythonOperator(
    task_id='second_task',
    python_callable=do_second_task,
    dag=dag)

sensor >> t2

if __name__ == "__main__":
    dag.cli()

External_test_B:

default_args = {
    'owner': 'airflow',
    'start_date': pendulum.yesterday().astimezone('US/Eastern')
}

dag = DAG(
    dag_id='External_test_B',
    default_args=default_args,
    schedule_interval=None
)

t1 = DummyOperator(task_id='first_task', dag=dag)

t1

if __name__ == "__main__":
    dag.cli()

你们中的一些人能告诉我我做错了什么吗?如何仅使用手动触发解决来自两个不同 DAG 的两个任务之间的通信问题?

【问题讨论】:

标签: python airflow


【解决方案1】:

经过大量研究,除非您编写自己的类来完成这项工作,否则无法通过手动触发来做到这一点。两个 dag 必须有一个具有相同开始日期的计划间隔,并且您需要在外部传感器定义中提及 execution_delta。

【讨论】:

  • 您提到的另一篇帖子因未提供答案而被删除。我编辑了这个以使其尽可能多地成为答案。请尝试进一步改进,并尽可能按照How to Answer 的建议进行改进。顺便说一句,要求其他人添加赏金并不是真正的工作原理。请拨打tour
  • 如果您有新问题,请点击 按钮提出问题。如果有助于提供上下文,请包含指向此问题的链接。 - From Review
【解决方案2】:

由于您的两个 dag 都是手动触发的,因此无法解决您的问题。 我建议你尝试在一个 DAG 中完成...

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-06-05
  • 1970-01-01
  • 1970-01-01
  • 2018-06-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多