【问题标题】:Airflow and Templates reference and PostgresHook气流和模板参考和 PostgresHook
【发布时间】:2023-01-18 23:20:44
【问题描述】:

我有个问题 我想使用模板参考 - {{ds}} 在 PostgresOperator 中替换时,一切正常(我猜是这样) 并且 PostgresHook 不想工作

 def prc_mymys_update(procedure: str, type_agg: str):
    with PostgresHook(postgres_conn_id=CONNECTION_ID_GP).get_conn() as conn:
        with conn.cursor() as cur:
            with open(URL_YML_2,"r", encoding="utf-8") as f:
                ya_2 = yaml.safe_load(f)
                yml_mymts_2 = ya_2['type_agg']
                query_pg = ""
                if yml_mymts_2[0]['type_agg_name'] == "day" and type_agg == "day":
                    sql_1 = yml_mymts_2[0]['sql']
                    query_pg = f"""{sql_1}"""
                elif yml_mymts_2[1]['type_agg_name'] == "retention" and type_agg == "retention":
                    sql_2 = yml_mymts_2[1]['sql']
                    query_pg = f"""{sql_2}"""
                elif yml_mymts_2[2]['type_agg_name'] == "mau" and type_agg == "mau":
                    sql_3 = yml_mymts_2[2]['sql']
                    query_pg = f"""{sql_3}"""
                cur.execute(query_pg)
                dates_list = cur.fetchall()
                for date_res in dates_list:
                    cur.execute(
                        "select from {}(%(date)s::date);".format(procedure),
                        {"date": date_res[0].strftime("%Y-%m-%d")},
                    )
    conn.close()

我用yml

type_agg:
  - type_agg_name: day
    sql: select calendar_date from entertainment_dds.v_calendar where calendar_date between '{{ds}}'::date - interval '7 days' and '{{ds}}'::date - 1 order by 1 desc
  - type_agg_name: retention
    sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc
  - type_agg_name: mau
    sql: select dt::date date_ from generate_series('{{execution_date.strftime('%Y-%m-%d')}}'::date - interval '7 days', '{{execution_date.strftime('%Y-%m-%d')}}'::date - interval '1 days', interval '1 days') dt order by 1 asc

当我运行一个 dag 时,会出现一个使用特定任务的时刻

- type_agg_name: retention
    sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc

我错了

psycopg2.errors.UndefinedColumn:列“y”不存在 第 1 行: ...((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}...

enter image description here

我试图找到有关 Templates reference 和 PostgresHook 交互的信息,但一无所获

https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-reference

【问题讨论】:

    标签: python postgresql airflow


    【解决方案1】:

    这是意料之中的。 templated_fields 是 Airflow 中的一个 attribute of the BaseOperator,所有运算符都从中继承。这是在使用 PostgresOperator 时传入 Jinja 表达式的效果很好。

    如果您需要编写自定义任务,则需要显式呈现模板值。像这样,未经测试,但我相信这可以在你的函数中推断出来:

    def prc_mymys_update(procedure: str, type_agg: str, ti):
        ti.render_templates()
    
        with PostgresHook(postgres_conn_id=CONNECTION_ID_GP).get_conn() as conn:
            with conn.cursor() as cur:
                ...
    

    ti kwargs 代表 Airflow 任务实例,可以作为推送到 Airflow 中每个任务的执行上下文的一部分直接访问。该对象有一个 render_templates() method,它将 Jinja 表达式转换为一个值。

    如果 PostgresOperator 不符合您的需求,您可以随时对运算符进行子类化并相应地对其进行调整。

    此外,sql 字符串本身有单引号,这会导致字符串解析问题,如您所见: '{{execution_date.strftime('%Y-%m-%d')}}' 应该是这样的: '{{execution_date.strftime("%Y-%m-%d")}}'

    【讨论】:

      【解决方案2】:

      请注意以下查询中的单引号:

      sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc
      

      具体来说,这部分:

      '{{execution_date.strftime('%Y-%m-%d')}}'
      

      这里有两个单独的字符串,以日期格式分隔。这是第一个字符串:

      '{{execution_date.strftime('
      

      这会导致日期格式单独呈现。如果将日期格式用双引号而不是单引号括起来,应该可以解决此错误。例如:

      sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime("%Y-%m-%d")}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc
      

      请注意,如果 RDBMS 中的双引号用于其他目的,您可能需要交换双引号和单引号,例如:

      "{{execution_date.strftime('%Y-%m-%d')}}"
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-05-02
        • 2020-12-27
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-01-31
        • 1970-01-01
        相关资源
        最近更新 更多