【问题标题】:if statement in Airflow templated field气流模板字段中的 if 语句
【发布时间】:2021-04-07 21:54:41
【问题描述】:

我有一个运行一些雪花运算符的 DAG,并且在 SQL 文件中我将日期模板化如下:

'{{ prev_execution_date.subtract(minutes=15).in_tz('America/Toronto').to_datetime_string() }}'
'{{ execution_date.in_tz('America/Toronto').to_datetime_string() }}'

这一切都很好。

我还希望能够手动触发 DAG 并传入日期,所以我在查询文件中尝试了以下操作

{{ dag_run.conf['startdate'] if dag_run else prev_execution_date.subtract(minutes=15).in_tz('America/Toronto').to_datetime_string()  }}

(基本上this question中的解决方案)

这在我传入 startdate 值的手动触发实例中工作正常,但 else 子句在它是计划实例时总是返回空白。

我是否遗漏了 else 子句中的某些内容,或者是否有完全绕过 if 语句的不同解决方案?

我正在使用 Airflow 1.10.12。

谢谢!

【问题讨论】:

    标签: python jinja2 airflow


    【解决方案1】:

    这是因为dag_run 对象始终存在,无论它是手动运行还是计划运行。

    from datetime import datetime
    
    from airflow.models import DAG
    from airflow.operators.python_operator import PythonOperator
    
    dag = DAG(
        dag_id="trigger_dag",
        start_date=datetime(2021, 4, 5),
        catchup=True,
        schedule_interval='@daily',
    )
    with dag:
        op = PythonOperator(
            task_id='a',
            python_callable=lambda x: print(x),
            op_kwargs={
                "value": "{{ dag_run.conf['startdate'] if dag_run['external_trigger'] else prev_execution_date.subtract(minutes=15).in_tz('America/Toronto').to_datetime_string() }}"
            },
        )
    

    这是修改后的执行日期值的输出。

    [2021-04-08 05:46:53,430] {taskinstance.py:901} INFO - Executing <Task(PythonOperator): a> on 2021-04-07T00:00:00+00:00
    [2021-04-08 05:46:53,434] {standard_task_runner.py:54} INFO - Started process 2604 to run task
    [2021-04-08 05:46:53,462] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'trigger_dag', 'a', '2021-04-07T00:00:00+00:00', '--job_id', '35', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/trigger.py', '--cfg_path', '/tmp/tmpe8zsnj8x']
    [2021-04-08 05:46:53,463] {standard_task_runner.py:78} INFO - Job 35: Subtask a
    [2021-04-08 05:46:53,509] {logging_mixin.py:112} INFO - Running <TaskInstance: trigger_dag.a 2021-04-07T00:00:00+00:00 [running]> on host 953a6668d603
    [2021-04-08 05:46:53,564] {logging_mixin.py:112} INFO - 2021-04-05 19:45:00
    [2021-04-08 05:46:53,565] {python_operator.py:114} INFO - Done. Returned value was: None
    [2021-04-08 05:46:53,574] {taskinstance.py:1070} INFO - Marking task as SUCCESS.dag_id=trigger_dag, task_id=a, execution_date=20210407T000000, start_date=20210408T054653, end_date=20210408T054653
    [2021-04-08 05:46:58,385] {local_task_job.py:102} INFO - Task exited with return code 0
    

    这是手动触发的 DAG 传递的 conf 值的输出。

    [2021-04-08 05:47:47,431] {taskinstance.py:901} INFO - Executing <Task(PythonOperator): a> on 2021-04-08T05:47:31.333405+00:00
    [2021-04-08 05:47:47,434] {standard_task_runner.py:54} INFO - Started process 2665 to run task
    [2021-04-08 05:47:47,463] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'trigger_dag', 'a', '2021-04-08T05:47:31.333405+00:00', '--job_id', '36', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/trigger.py', '--cfg_path', '/tmp/tmpwoshw679']
    [2021-04-08 05:47:47,464] {standard_task_runner.py:78} INFO - Job 36: Subtask a
    [2021-04-08 05:47:47,512] {logging_mixin.py:112} INFO - Running <TaskInstance: trigger_dag.a 2021-04-08T05:47:31.333405+00:00 [running]> on host 953a6668d603
    [2021-04-08 05:47:47,564] {logging_mixin.py:112} INFO - super-duper
    [2021-04-08 05:47:47,564] {python_operator.py:114} INFO - Done. Returned value was: None
    [2021-04-08 05:47:47,574] {taskinstance.py:1070} INFO - Marking task as SUCCESS.dag_id=trigger_dag, task_id=a, execution_date=20210408T054731, start_date=20210408T054747, end_date=20210408T054747
    [2021-04-08 05:47:52,388] {local_task_job.py:102} INFO - Task exited with return code 0
    

    【讨论】:

    • 啊。感谢您的详尽解释!
    猜你喜欢
    • 1970-01-01
    • 2021-07-17
    • 1970-01-01
    • 2022-01-22
    • 2020-03-07
    • 1970-01-01
    • 2017-03-29
    • 2016-11-26
    • 2012-09-04
    相关资源
    最近更新 更多