【问题标题】:use Airflow connection from a jinja template使用 jinja 模板中的 Airflow 连接
【发布时间】:2021-01-21 11:25:53
【问题描述】:

我正在尝试使用环境变量将 DB 参数传递给 BashOperator,但我找不到任何文档/示例如何使用来自 Jinja 模板的连接。

所以我正在寻找类似于变量的东西

echo {{ var.value.<variable_name> }}

【问题讨论】:

    标签: airflow


    【解决方案1】:

    对于 Airflow >= 2.2.0

    假设您有 conn id test_conn,您可以通过以下方式直接使用宏:

    {{ conn.test_conn }} 所以你可以得到任何连接属性,比如:

    {{ conn.test_conn.host }}{{ conn.test_conn.login }}{{ conn.test_conn.password }} 等等。

    对于气流 :

    没有现成的宏,但是您可以创建自定义宏来解决这个问题。

    连接示例:

    创建宏:

    def get_host(conn_id):
        connection = BaseHook.get_connection(conn_id)
        return connection.host
    
    def get_schema(conn_id):
        connection = BaseHook.get_connection(conn_id)
        return connection.schema
    
    def get_login(conn_id):
        connection = BaseHook.get_connection(conn_id)
        return connection.login
    

    在 DAG 中使用它们:

    def print_function(**context):
        print(f"host={context['host']} schema={context['schema']} login={context['login']}")
    
    user_macros = {
        'get_host': get_host,
        'get_schema': get_schema,
        'get_login': get_login,
    }
    
    with DAG(
        dag_id='connection',
        default_args=default_args,
        schedule_interval=None,
        user_defined_macros=user_macros,
    ) as dag:
    
    # Example how to use as function
    python_op = PythonOperator( 
        task_id='python_task',
        provide_context=True,
        python_callable=print_function,
        op_kwargs={
            'host': get_host("test_conn"),
            'schema': get_schema("test_conn"),
            'login': get_login("test_conn"),
        }
    )
    
    # Example how to use as Jinja string
    bash_op = BashOperator( 
        task_id='bash_task',
        bash_command='echo {{ get_host("test_conn") }} {{ get_schema("test_conn") }} {{ get_login("test_conn") }} ',
    )
    

    PythonOperator 的渲染示例:

    BashOperator 渲染示例:

    一般说明: 这段代码所做的是创建一个自定义函数func() 用作user_defined_macros,从而提供使用它的能力,就像这个宏是由Airflow 本身定义的一样。 您可以通过以下方式访问模板:{{ func() }},如示例中函数允许接受参数所示。

    注意您可以为连接对象中的所有字段创建此类函数。

    谨慎使用它,将密码作为文本传递可能不是一个好主意。

    【讨论】:

      【解决方案2】:

      我的 PR 添加了 {{ conn.my_conn_id.login }} 语法,它将在气流 2.2.0 中可用(尚未发布截至 2021 年 9 月 22 日)。

      在此处查看templates reference 的未发布文档

      对于 2.1.4 及更早版本:

      改进以前的答案,

      为每个 DAG 定义宏:{{conn.&lt;conn_id&gt;}}

      您可以使用以下宏获得conn.&lt;connection_name&gt;.host 语法:

      class ConnectionGrabber:
          def __getattr__(self, name):
              return  Connection.get_connection_from_secrets(name)
      dag = DAG(user_defined_macros={'connection': ConnectionGrabber()}
      
      

      将名称 connection 注入到 jinja 模板上下文中,connection 是一个 ConnectionGrabber 实例。此ConnectionGrabber 提供动态/托管属性,因此当您请求属性my_conn_id(如connection.my_conn_id)时,它将执行使用airflow.models.Connection.get_connection_from_secrets 的查找并返回,从那里您可以使用a.m.Connection 属性,如hostloginpassword

      在 jinja 模板中使用bash_command='echo {{connection.mssql.host }}' 访问连接mssql 的完整示例:

      from airflow.models import DAG,Connection
      from airflow.operators.bash import BashOperator
      from airflow.utils.dates import days_ago
      
      
      class ConnectionGrabber:
          def __getattr__(self, name):
              return  Connection.get_connection_from_secrets(name)
      
      
      args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}
      
      dag = DAG(
          dag_id='test_connection',
          default_args=args,
          schedule_interval='0 0 * * *',
          dagrun_timeout=timedelta(minutes=60),
          user_defined_macros={'connection': ConnectionGrabber()}
      )
      
      task = BashOperator(task_id='read_connection', bash_command='echo {{connection.mssql.host }}', dag=dag)
      

      在插件中定义宏:{{macros.conn.value.&lt;conn_id&gt;}}

      如果您想在所有 DAG 中使用此宏,您可以将其包装在插件中,如下所示:

      # $AIRFLOW_HOME/plugins/connection_macro.py
      from airflow.plugins_manager import AirflowPlugin
      from airflow.models import Connection
      
      
      class ConnectionGrabber:
          __name__ = "value"
          def __str__(self):
              return self.__name__
          def __getattr__(self, name):
              return  Connection.get_connection_from_secrets(name)
      
      class MacrosPlugin(AirflowPlugin):
          name = "conn"
          macros = [ConnectionGrabber()]
      

      检查插件是否可以加载airflow plugins

      airflow plugins
      name | source                    | macros
      =====+===========================+=======
      conn | $PLUGINS_FOLDER/macros.py | value
      

      然后你可以像这样在你的神社模板中使用{{ macros.conn.value.&lt;conn_id&gt;.host }}

         task = BashOperator(task_id='read_connection', bash_command='echo macros.conn.value.mssql.host = {{macros.conn.value.mssql.host }}', dag=dag)
      

      我还打开了一个issue 以原生添加conn.value.&lt;conn_id&gt; 语法

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-07-09
        • 2023-01-27
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多