【发布时间】:2021-05-28 00:59:11
【问题描述】:
我目前正在尝试自动化由不同脚本组成的数据管道。其中许多脚本依赖于名为DB_URL 的环境变量的设置。
在 python 脚本中,这个变量是通过os.getenv('DB_URL', None)读入的。
我可以通过使用 BashOperator 并在脚本执行前直接指定 DB_URL 来执行 DAG:
default_args = {
'owner': 'Hans Bambel',
'depends_on_past': False,
'email': ['hans-bambel@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='get_data_from_testdb',
default_args=default_args,
description='Gets some data from the DB specified in the connection "test_db"',
schedule_interval=None,
start_date=datetime(2021, 2, 24)
) as dag:
connection = BaseHook.get_connection("test_db")
db_url = 'postgresql://' + str(connection.login) + ':' + str(connection.password) + \
'@' + str(connection.host) + ':' + str(connection.port) + '/' \
+ str(connection.schema)
test_db_call = BashOperator(
task_id='test_db_call',
bash_command=f'export DB_URL={db_url}; /path/to/my/conda/environment/python /path/to/my/scripts/db_connection_test.py'
)
但我想为所有使用相同环境变量的脚本设置一次DB_URL:
with DAG(
dag_id='get_data_from_testdb',
default_args=default_args,
description='Gets some data from the DB specified in the connection "test_db"',
schedule_interval=None,
start_date=datetime(2021, 2, 24)
) as dag:
connection = BaseHook.get_connection("test_db")
db_url = 'postgresql://' + str(connection.login) + ':' + str(connection.password) + '@' + str(connection.host) + ':' + str(connection.port) + '/' + str(connection.schema)
set_db_env = BashOperator(
task_id='set-dburl',
bash_command=f'export DB_URL={db_url}'
)
# activate_myenv = BashOperator(
# task_id='activate-conda-environment',
# bash_command='source activate myenv'
# )
test_db_call = BashOperator(
task_id='test_db_call',
bash_command=f'/path/to/my/conda/environment/python /path/to/my/scripts/db_connection_test.py'
)
set_db-env >> test_db_call
另外,我想预先激活我的 conda 环境(由activate_myenv-task 准备),但是在将它添加到 DAG 时出现以下错误:
[2021-02-25 17:07:12,923] {bash.py:158} INFO - Running command: source activate carex
[2021-02-25 17:07:12,932] {bash.py:169} INFO - Output:
[2021-02-25 17:07:12,942] {bash.py:173} INFO - bash: activate: No such file or directory
[2021-02-25 17:07:12,943] {bash.py:177} INFO - Command exited with return code 1
我希望每个 DAG 都是独立运行的,但不是每个任务也是如此。因此,我希望我的第二个 DAG 也能正常工作。 有什么办法可以让它发挥作用吗?
【问题讨论】:
标签: python anaconda environment-variables airflow