【问题标题】:Airflow how to catch up on hourly basis气流如何按小时赶上
【发布时间】:2021-09-12 01:50:57
【问题描述】:

我有一个 dag,它按小时收集数据而没有追赶。现在我想从 6 月 1 日开始赶上进度:

dag = DAG(dag_id='test',
          start_date=datetime(2021, 6, 1),
          description='Airflow Test Run',
          schedule_interval="15 0 * * *",   # Hourly at minute 15
          max_active_runs=1,
          catchup=True,
          default_args=default_args
          )

以及涉及的 Vertica SQL(catchup=False 的初始版本):

INSERT INTO table1
SELECT * FROM table2 
WHERE
    -- it means "Load data from last hour"
    eventTimestamp >= DATE_TRUNC('HOUR', NOW()) - interval '1 hour'
    AND eventTimestamp < DATE_TRUNC('HOUR', NOW())

文件表明:

具有 start_date、可能是 end_date 和 schedule_interval 的 Airflow DAG 定义了一系列间隔,调度程序将这些间隔变成单独的 DAG 运行和执行。默认情况下,调度程序将在自上次执行日期以来未运行(或已被清除)的任何时间间隔内启动 DAG 运行。这个概念称为 Catchup。

但是,我不知道如何编写满足此要求的 SQL 查询。我知道我可以将execute_date 传递给查询,但这不是每天追赶,而是每小时追赶。 AFAIK 我们没有像 execution_hour 这样的东西。

如何在没有太多技巧的情况下实现这一目标?谢谢!

【问题讨论】:

    标签: python airflow vertica


    【解决方案1】:

    首先,您应该使用 ts 气流宏而不是使用 NOW() 函数。使用此宏,您始终可以在气流的确切执行日期重新运行 DAG。你可以阅读更多关于气流的宏here

    其次,要回填6月1日以后的数据,可以使用这条SQL

    INSERT INTO table1
    SELECT * FROM table2 
    WHERE
        eventTimestamp >= TIMESTAMP '2021-06-01 00:00:00' 
        AND eventTimestamp < TIMESTAMP [the timestamp which you already have data]
    

    【讨论】:

      【解决方案2】:

      您需要使用与您的用例相关的 catchup=False 和 Airflow macros

      我相信您正在寻找的是:

      WHERE eventTimestamp >= {{ execution_date }} AND {{ next_execution_date }}
      

      这意味着: 在将在 2021-06-30 04:00 开始的运行中(执行日期为 2021-06-30 03:00 ),查询将是:

      WHERE eventTimestamp >= '2021-06-30 03:00' AND '2021-06-30 04:00'
      

      使用execution_datecatchup=False 意味着在停机时间为 3 小时的情况下 - 一旦 Airflow 恢复,它将安排 3 次运行 - 每次运行将处理 1 小时的数据(就像如果有完全没有停机时间)

      请注意,您可以根据您使用的数据库的要求更改时间戳的格式。

      {{ execution_date }} 会给你2021-06-30T07:30:16.365941+00:00

      {{ execution_date.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] }} 会给2021-06-29 07:30:16.365

      【讨论】:

        猜你喜欢
        • 2021-04-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-04-03
        • 1970-01-01
        • 2020-10-13
        • 1970-01-01
        相关资源
        最近更新 更多