【发布时间】:2020-08-21 17:07:17
【问题描述】:
我正在安排气流作业。但是,为了验证我是否安排了正确的作业,我需要查看它将来何时运行。
Airflow 具有以下命令,可以让我进行下一次运行。但是,这对于某些用例来说还不够。例如,我安排了每隔一个星期五运行一次作业。如何验证。
airflow next_execution <dag_id>
有没有办法,我可以获得这个 dag 运行的所有未来日期。或至少几个?
【问题讨论】:
我正在安排气流作业。但是,为了验证我是否安排了正确的作业,我需要查看它将来何时运行。
Airflow 具有以下命令,可以让我进行下一次运行。但是,这对于某些用例来说还不够。例如,我安排了每隔一个星期五运行一次作业。如何验证。
airflow next_execution <dag_id>
有没有办法,我可以获得这个 dag 运行的所有未来日期。或至少几个?
【问题讨论】:
虽然大多数进程使用croniter,但如果您有权访问您的安装,最好通过现有接口从“源”获取信息:
from airflow import models
from datetime import datetime, timedelta
dag_bag = models.DagBag()
dag_id = "dag_name"
dag = dag_bag.get_dag(dag_id)
now = datetime.now()
until = now + timedelta(days=21)
runs = dag.get_run_dates(start_date=now, end_date=until)
print(runs)
【讨论】:
from sshtunnel import SSHTunnelForwarder ImportError: No module named sshtunnel
sshtunnel - 创建一个带有跟踪和详细信息的新问题以获取帮助。
[root@server airflow]# pip freeze | grep -i sshtunnel sshtunnel==0.1.5
Airflow 在钩子 croniter 下使用,用于example。按照croniter 文档中的示例,这可以按如下方式工作(例如,假设 dag 在每个星期五的下午 12 点运行,并且我们的基准日期是昨天的 8 月 20 日)。
from croniter import croniter
from datetime import datetime
# Specify current date
base = datetime(2020, 8, 20, 0, 0)
# Set croniter
iter = croniter('0 12 * * 5', base)
# Get next execution
iter.get_next(datetime)
>>>
datetime.datetime(2020, 8, 21, 12, 0)
您可以在其中指定base 作为您的 dag (dag.latest_execution_date) 的最新执行日期。您可以通过调用 n 次 iter.get_next(datetime) 来获取它的以下执行情况。
【讨论】: