【发布时间】:2021-09-12 01:50:57
【问题描述】:
我有一个 dag,它按小时收集数据而没有追赶。现在我想从 6 月 1 日开始赶上进度:
dag = DAG(dag_id='test',
start_date=datetime(2021, 6, 1),
description='Airflow Test Run',
schedule_interval="15 0 * * *", # Hourly at minute 15
max_active_runs=1,
catchup=True,
default_args=default_args
)
以及涉及的 Vertica SQL(catchup=False 的初始版本):
INSERT INTO table1
SELECT * FROM table2
WHERE
-- it means "Load data from last hour"
eventTimestamp >= DATE_TRUNC('HOUR', NOW()) - interval '1 hour'
AND eventTimestamp < DATE_TRUNC('HOUR', NOW())
文件表明:
具有 start_date、可能是 end_date 和 schedule_interval 的 Airflow DAG 定义了一系列间隔,调度程序将这些间隔变成单独的 DAG 运行和执行。默认情况下,调度程序将在自上次执行日期以来未运行(或已被清除)的任何时间间隔内启动 DAG 运行。这个概念称为 Catchup。
但是,我不知道如何编写满足此要求的 SQL 查询。我知道我可以将execute_date 传递给查询,但这不是每天追赶,而是每小时追赶。 AFAIK 我们没有像 execution_hour 这样的东西。
如何在没有太多技巧的情况下实现这一目标?谢谢!
【问题讨论】: