【问题标题】:Airflow can't pickle _thread._local objects气流不能腌制 _thread._local 对象
【发布时间】:2019-12-14 11:00:27
【问题描述】:

我目前正在我的 DAG 中创建一个引擎,并将这个 sqlalchemy 引擎作为参数传递给 PythonOperator 以执行一些数据库工作。例如

PythonOperator(python_callable=my_callable,
                               op_args=[engine],
                               provide_context=True,
                               task_id = 'my_task',
                               dag=dag)

当我尝试清除任务状态时,我得到一个错误

File "/opt/conda/lib/python3.7/copy.py", line 169, in deepcopy
rv = reductor(4)
TypeError: can't pickle _thread._local objects

这很可能是因为您无法腌制引擎对象:

pickle.dumps(engine)
TypeError: can't pickle _thread._local objects

我想知道是否有解决此问题的好方法,以便我可以有效地使用 Airflow 网络服务器。我需要通过 python 可调用的东西来允许它与数据库交互,它可能是连接字符串,但是在 DAG 中制作引擎一次并将其传递给所有操作员比制作引擎更容易在每一个。

【问题讨论】:

  • 您的任务应该创建数据库连接,不要将其作为参数传递
  • 谢谢@miraculixx,有什么特别的原因(除了泡菜问题)为什么这比直接通过引擎更有好处?
  • 是的,数据库连接本质上是短暂的,仅在特定进程使用的时间内存在。气流任务在执行时(可能更晚,重复)在不同的进程中实例化,可能在不同的机器上。因此,即使您可以腌制连接,它在运行时也不会对任务有用,因为它很可能无论如何都会抓住存在。一般来说,原则上,不仅在 Airflow 中,连接应该始终由同一个进程创建、管理和关闭。

标签: python sqlalchemy airflow


【解决方案1】:

如果有解决此问题的好方法,我可以有效地使用 Airflow 网络服务器。我需要通过 python 可调用的东西来允许它与数据库交互

传递连接字符串是可能的,但它不应包含凭据(用户 ID、密码),因为您不希望凭据以纯格式存储。 Airflow为此提供了两个概念,Variables和Connections,见this answer for details.

在 DAG 中制作一次引擎并将其传递给所有 Operator 比在每个人中制作引擎更容易

实际上 - 不。乍一看似乎更容易,但仔细检查却不是一个好主意。

就其本质而言,数据库连接是短暂的,并且仅在特定进程使用的时间内存在。气流任务在执行时(可能更晚,重复)在不同的进程中实例化,可能在不同的机器上。因此,即使您可以腌制连接,它在运行时也不会对任务有用,因为它很可能无论如何都会抓住存在。

一般而言,原则上,不仅在 Airflow 中,连接应该始终由同一个进程创建、管理和关闭。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-04-07
    • 2022-01-11
    • 1970-01-01
    • 2016-03-27
    • 1970-01-01
    • 2018-10-30
    • 2022-07-16
    • 2020-01-20
    相关资源
    最近更新 更多