【发布时间】:2019-12-14 11:00:27
【问题描述】:
我目前正在我的 DAG 中创建一个引擎,并将这个 sqlalchemy 引擎作为参数传递给 PythonOperator 以执行一些数据库工作。例如
PythonOperator(python_callable=my_callable,
op_args=[engine],
provide_context=True,
task_id = 'my_task',
dag=dag)
当我尝试清除任务状态时,我得到一个错误
File "/opt/conda/lib/python3.7/copy.py", line 169, in deepcopy
rv = reductor(4)
TypeError: can't pickle _thread._local objects
这很可能是因为您无法腌制引擎对象:
pickle.dumps(engine)
TypeError: can't pickle _thread._local objects
我想知道是否有解决此问题的好方法,以便我可以有效地使用 Airflow 网络服务器。我需要通过 python 可调用的东西来允许它与数据库交互,它可能是连接字符串,但是在 DAG 中制作引擎一次并将其传递给所有操作员比制作引擎更容易在每一个。
【问题讨论】:
-
您的任务应该创建数据库连接,不要将其作为参数传递
-
谢谢@miraculixx,有什么特别的原因(除了泡菜问题)为什么这比直接通过引擎更有好处?
-
是的,数据库连接本质上是短暂的,仅在特定进程使用的时间内存在。气流任务在执行时(可能更晚,重复)在不同的进程中实例化,可能在不同的机器上。因此,即使您可以腌制连接,它在运行时也不会对任务有用,因为它很可能无论如何都会抓住存在。一般来说,原则上,不仅在 Airflow 中,连接应该始终由同一个进程创建、管理和关闭。
标签: python sqlalchemy airflow