【问题标题】:AirFlow: Setting environment variable and environment in AirFlow taskAirFlow:在 AirFlow 任务中设置环境变量和环境
【发布时间】:2021-05-28 00:59:11
【问题描述】:

我目前正在尝试自动化由不同脚本组成的数据管道。其中许多脚本依赖于名为DB_URL 的环境变量的设置。 在 python 脚本中,这个变量是通过os.getenv('DB_URL', None)读入的。

我可以通过使用 BashOperator 并在脚本执行前直接指定 DB_URL 来执行 DAG:

default_args = {
'owner': 'Hans Bambel',
'depends_on_past': False,
'email': ['hans-bambel@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='get_data_from_testdb',
    default_args=default_args,
    description='Gets some data from the DB specified in the connection "test_db"',
    schedule_interval=None,
    start_date=datetime(2021, 2, 24)
) as dag:
    connection = BaseHook.get_connection("test_db")
    db_url = 'postgresql://' + str(connection.login) + ':' + str(connection.password) + \
             '@' + str(connection.host) + ':' + str(connection.port) + '/' \
             + str(connection.schema)

    test_db_call = BashOperator(
        task_id='test_db_call',
        bash_command=f'export DB_URL={db_url}; /path/to/my/conda/environment/python /path/to/my/scripts/db_connection_test.py'
    )

但我想为所有使用相同环境变量的脚本设置一次DB_URL

with DAG(
        dag_id='get_data_from_testdb',
        default_args=default_args,
        description='Gets some data from the DB specified in the connection "test_db"',
        schedule_interval=None,
        start_date=datetime(2021, 2, 24)
    ) as dag:
        connection = BaseHook.get_connection("test_db")
        db_url = 'postgresql://' + str(connection.login) + ':' + str(connection.password) + '@' + str(connection.host) + ':' + str(connection.port) + '/' + str(connection.schema)

        set_db_env = BashOperator(
            task_id='set-dburl',
            bash_command=f'export DB_URL={db_url}'
        )

        # activate_myenv = BashOperator(
        #     task_id='activate-conda-environment',
        #     bash_command='source activate myenv'
        # )

        test_db_call = BashOperator(
            task_id='test_db_call',
            bash_command=f'/path/to/my/conda/environment/python /path/to/my/scripts/db_connection_test.py'
        )
        set_db-env >> test_db_call

另外,我想预先激活我的 conda 环境(由activate_myenv-task 准备),但是在将它添加到 DAG 时出现以下错误:

[2021-02-25 17:07:12,923] {bash.py:158} INFO - Running command: source activate carex
[2021-02-25 17:07:12,932] {bash.py:169} INFO - Output:
[2021-02-25 17:07:12,942] {bash.py:173} INFO - bash: activate: No such file or directory
[2021-02-25 17:07:12,943] {bash.py:177} INFO - Command exited with return code 1

我希望每个 DAG 都是独立运行的,但不是每个任务也是如此。因此,我希望我的第二个 DAG 也能正常工作。 有什么办法可以让它发挥作用吗?

【问题讨论】:

    标签: python anaconda environment-variables airflow


    【解决方案1】:

    当您对 unix shell 的环境进行任何更改时,例如创建一个新的 变量,该更改会向下传播到子进程,但永远不会传播到子进程 外壳的父级。这就是 Unix shell 的工作原理。

    因此,当您的 BashOperator 执行“export DB_URL=...”时,会更改 shell 正在运行 set_db_env 任务,但一旦该任务完成,它的 修改后的环境消失了,您创建的 DB_URL 变量也随之消失。下一个任务 test_db_call 继承了 set_db_env 开始的相同环境 与,而不是它改变的那个。

    要使 DB_URL 可用于所有脚本,您可以在之前定义它 运行气流进程,通常在运行的用户的.bashrc 文件中 气流过程。这样你的 shell 脚本就可以直接访问 变量的值以通常的方式。

    或者您可以使用 Airflow 的“变量”:在 Airflow UI 中,菜单 Admin / Variables,定义 key=DB_URL,设置值并保存。然后你可以使用 Concepts docs 中描述的机制 神社模板:

    bash_command='echo {{ var.value.DB_URL }}'
    

    使用双括号语法访问变量的值。

    【讨论】:

    • 感谢您的回答。我也有同样的期望,但我认为 DAG 的所有任务都在同一个环境中运行,因此当前面的任务修改它时,后续任务能够使用修改后的环境。但是当任务并行运行时,这可能会出现问题(即使这应该是设置 DAG 的人的问题)。我想要灵活并动态地使用 AirFlow 来定义变量,因此事先设置 env 是不可取的。第二个命题基本上已经在我上面的代码中完成了,正如你在 Hook for security 中看到的那样。
    猜你喜欢
    • 2018-11-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-17
    • 2018-07-13
    • 2017-02-07
    相关资源
    最近更新 更多