【问题标题】:Airflow scheduling, re-run failed task气流调度,重新运行失败的任务
【发布时间】:2023-03-30 07:12:01
【问题描述】:

虽然我已经能够为读取(数据存储)和写入(大查询)编写代码,但我对气流还很陌生。 我无法安排我的工作/任务。 我想每 2 小时运行一次作业,从源中读取前 2 小时的数据。 现在,如果任务失败,我想手动重试它,但在特定的 2 小时内,它本来是要运行的。我如何做到这一点? 我想到了几件事:

  • 存储作业 ID 和计划运行的时间范围。在重试时,我可以从 sqlite 或其他数据库中读取它。
  • Airflow 有一个内置变量,它指向该作业的预期执行时间,我可以在我的代码中使用它。

我应该考虑其他选择吗?或以上任何一种?

【问题讨论】:

  • 您能否发布您的 DAG 脚本的 a minimal example 并解释其中的不符合您预期的内容?

标签: google-cloud-platform google-bigquery google-cloud-datastore airflow airflow-scheduler


【解决方案1】:

我假设您在任务中使用当前日期时间(例如 now()),是吗?

最好的做法是在 Airflow Context 中使用 execution_date 的值,而不是在您的操作员中调用 datetime.now(),因为即使您重新执行 DAG,计划作业的 execution_date 也不会更改/任务。

【讨论】:

  • 是的,我想出来了,我希望应该有一些基于上下文的内部变量。谢谢
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-11-14
  • 2017-08-24
  • 2019-12-31
  • 1970-01-01
  • 2021-08-21
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多