【问题标题】:Python SQLalchemy - can I pass the connection object between functions?Python SQLalchemy - 我可以在函数之间传递连接对象吗?
【发布时间】:2022-01-19 23:08:33
【问题描述】:

我有一个从 mysql/mariadb 读取的 python 应用程序,使用它从 api 获取数据,然后将结果插入另一个表。

我已经设置了一个带有函数的模块来连接到数据库并返回传递给其他函数/模块的连接对象。但是,我认为这可能不是正确的方法。这个想法是有一个小模块,我可以在需要连接到数据库时调用它。

另请注意,我在循环期间(以及在传递给 db_update 模块的循环内)使用相同的连接对象,并在所有操作完成后调用 close()

我有时也会从数据库中收到一些警告,这些警告大多发生在我调用db_conn.close() 的地方,所以我想我没有正确处理连接或会话/引擎。此外,日志警告中的连接 ID 不断增加,这是另一个提示,我做错了。

[Warning] Aborted connection 351 to db: 'some_db' user: 'some_user' host: '172.28.0.3' (Got an error reading communication packets)

这是一些代表我目前拥有的结构的伪代码


################
## db_connect.py
################
# imports ...
from sqlalchemy import create_engine
def db_connect():
    # get env ...
    db_string = f"mysql+pymysql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
    try:
        engine = create_engine(db_string)
    except Exception as e:
        return None
    
    db_conn = engine.connect()
    return db_conn


################
## db_update.py
################
# imports ...
def db_insert(db_conn, api_result):
    # ...
    ins_qry = "INSERT INTO target_table (attr_a, attr_b) VALUES (:a, :b);"
    ins_qry = text(ins_qry)
    ins_qry = ins_qry.bindparams(a = value_a, b = value_b)

    try:
        db_conn.execute(ins_qry)
    except Exception as e:
        print(e)
        return None     
    return True


################
## main.py
################
from sqlalchemy import text
from db_connect import db_connect
from db_update import db_insert

def run():
    try:
        db_conn = db_connect()
        if not db_conn:
            return False
    except Exception as e:
        print(e)

    qry =  "SELECT *
            FROM some_table
            WHERE some_attr IN (:some_value);"
    qry = text(qry)
    search_run_qry = qry.bindparams(
            some_value  = 'abc'
    )
    result_list = db_conn.execute(qry).fetchall()

    for result_item in result_list:
        ## do stuff like fetching data from api for every record in the query result
        api_result = get_api_data(...)
        ## insert into db:
        db_ins_status = db_insert(db_conn, api_result)
        ## ...
    
    db_conn.close

run()

编辑:另一个问题: a) 在循环中是否可以,在每次迭代时更新以使用相同的连接,或者将引擎传递给run() 函数并在之前调用db_conn = engine.connect()db_conn.close() 是否更明智?每次更新后?

b) 我正在考虑使用ThreadPoolExecutor 而不是 API 调用的循环。这会对如何使用连接产生影响,即我可以为多个线程使用相同的连接来更新同一个表吗?

注意:我没有使用 ORM 功能,主要是因为我有很强的 DWH/SQL 背景(虽然不如 DBA),而且我习惯于编写复杂的 sql 查询。出于这个原因,我正在考虑改用 PyMySQL 连接器。

提前致谢!

【问题讨论】:

    标签: python mysql sqlalchemy mariadb


    【解决方案1】:

    是的,您可以将连接对象作为参数返回/传递,但是 db_connect 方法的目的是什么,除了测试连接?正如我所见,这种 db_connect 方法没有任何目的,因此我建议您像以前一样这样做。

    我想分享我的一个项目的代码 sn-p。

    def create_record(sql_query: str, data: tuple):
        try:
            connection = mysql_obj.connect()
            db_cursor = connection.cursor()
            db_cursor.execute(sql_query, data)
            connection.commit()
            return db_cursor, connection
        except Exception as error:
            print(f'Connection failed error message: {error}')
    

    然后用这个作为我需要的另一个

    db_cursor, connection, query_data = fetch_data(sql_query, query_data)
    

    毕竟我需要关闭与此方法和方法调用的连接。

    def close_connection(connection, db_cursor):
        """
        This method used to close SQL server connection
        """
        db_cursor.close()
        connection.close()
    

    和调用方法

     close_connection(connection, db_cursor)
    

    我不确定我是否可以分享我的 github 我的支票 link 请。在 model.py 下,您可以看到数据库方法并查看如何调用它们检查 main.py

    最好, 哈桑。

    【讨论】:

    • 谢谢哈桑。你使用的是 PyMySQL 还是 mysql-connector-python?
    • 你好 Phil,我使用 mysql-connector-python 但你也可以快速配置另一个库。没什么大不了的。
    猜你喜欢
    • 2021-03-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-12
    • 1970-01-01
    相关资源
    最近更新 更多