【问题标题】:How to use jinja template in Airflow MySQL Operator如何在 Airflow MySQL Operator 中使用 jinja 模板
【发布时间】:2021-10-21 07:43:51
【问题描述】:

我目前正在 Airflow 的 MySQLOperator 中运行此查询。 如何使用 Jinja 模板将 region、s3 bucket 替换为参数?

  • 气流版本:2.0.2
  • Python:3.7
sql = """SELECT * FROM test
INTO OUTFILE S3 's3-ap-southeast-1://my-s3-bucket/my-key'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
  TERMINATED BY ','
  OPTIONALLY ENCLOSED BY '"'
LINES
  TERMINATED BY '\\n'
OVERWRITE ON;
"""
mysql_to_s3 = MySqlOperator(
  task_id="mysql_to_s3",
  dag=dag,
  sql=rds_sql,
  mysql_conn_id=MYSQL_CONN_ID,
  parameters={
    "s3_bucket": "my-s3-bucket",
    "s3_key_prefix": "my-key",
    "region": "ap-southeast-1",
  },
  autocommit=False,
  database="test",
)

【问题讨论】:

    标签: airflow airflow-2.x


    【解决方案1】:

    您可以使用参数将动态值传递给您的 SQL:

    sql = """SELECT * FROM test
    INTO OUTFILE S3 '{{ params.region }}://{{ params.s3_bucket }}/{{ params.s3_key_prefix }}'
    CHARACTER SET utf8
    FORMAT CSV HEADER
    FIELDS
      TERMINATED BY ','
      OPTIONALLY ENCLOSED BY '"'
    LINES
      TERMINATED BY '\\n'
    OVERWRITE ON;
    """
    
    mysql_to_s3 = MySqlOperator(
      task_id="mysql_to_s3",
      dag=dag,
      sql=sql,
      mysql_conn_id=MYSQL_CONN_ID,
      params={
        "s3_bucket": "my-s3-bucket",
        "s3_key_prefix": "my-key",
        "region": "ap-southeast-1",
      },
      autocommit=False,
      database="test",
    )
    

    如果值存储在 Airflow 变量中(regions3_buckets3_key_prefix),那么您可以从运算符中删除 params 字典并将您的 sql 更改为:

    INTO OUTFILE S3 '{{ var.value.region }}://{{ var.value.s3_bucket }}/{{ var.value.s3_key_prefix }}'
    

    在这两个选项中,Airflow 将模板化 sql 字符串并在执行运算符时将占位符替换为值。您可以在任务渲染选项卡中查看实际值。

    【讨论】:

    • 为什么是{{ region }} 而不是{{ params.region }}
    • @YoheiOnishi 已修复。
    【解决方案2】:
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-01-27
    • 2021-07-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多