【问题标题】:Using Sqlalchemy on twisted servers to report errors at regular intervals在扭曲的服务器上使用 Sqlalchemy 定期报告错误
【发布时间】:2019-06-09 01:13:48
【问题描述】:

我的扭曲服务器连接四个客户端,为每个客户端初始化一次 Sqlalchemy 引擎。系统运行一段时间后,会报如下错误:

[Failure instance: Traceback: <class'sqlalchemy. exc. ResourceClosedError'>: This result object does not return rows. It has been closed automatically.

/usr/lib/python 2.7/threading.py:801:u bootstrap_inner

3/usr/lib/python 2.7/threading.py:754:run

4/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/twisted/_threads/_threadworker.py:46:work

5/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/twisted/_threads/_team.py:190:doWork

6 - < exception caught here >

7/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/twisted/python/threadpool.py:250:inContext

8/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/twisted/python/threadpool.py:266: <lambda>

9/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/twisted/python/context.py:122:call WithContext

10/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/twisted/python/context.py:85:call WithContext

11. / data_server.py:231: check_update_mysqldb

12. / DRV / mysqldb_driver. py: 61: search_device_by_mac

13/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/sqlalchemy/orm/query.py:2895:first:

14/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/sqlalchemy/orm/query.py:2687:u getitem_u

15/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/sqlalchemy/orm/loading.py:98:instances

16/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/sqlalchemy/util/compat.py:265:raise_from_cause

17/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/sqlalchemy/orm/loading.py:61:instances

18/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/sqlalchemy/orm/query.py:3842:row_processor

19/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/sqlalchemy/orm/loading.py:361:_instance_processor

20/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/sqlalchemy/engine/result.py:654:_getter

21/home/sites/data_collecting_server/venv/local/lib/python 2.7/site-packages/sqlalchemy/engine/result.py:1088:_non_result

22]

【问题讨论】:

  • 你见过this answer吗? sqlalchemy 作者解释说,您遇到的确切问题与跨线程共享资源有关。你确定不是吗?另请查看 [this](docs.sqlalchemy.org/en/latest/faq/…),这是解决您的特定问题的文档中的一个部分。
  • 非常感谢您的回答。我已经看到了你提到的两点。所以我只是使用了 scoped_session 机制。我目前的做法是,每当有客户端连接时,我都会初始化一个Sqlalchemy连接和引擎,然后在当前连接的每个线程函数开头调用set_session(),生成单独的会话进行操作。在线程函数结束时,我调用 close_session() 来删除会话。当前客户端对应的所有线程共享连接和引擎,但会话是独立的。
  • 我不知道我为什么这样做,但我仍然收到上面的错误。我的做法有什么问题?
  • 但这不是:'与当前客户端对应的所有线程共享连接和引擎',违反了 Mike Bayer 在我上面链接的答案中给出的建议吗?
  • 抱歉,可能是我描述的问题。线程共享引擎,会话连接使用scoped_session,每个线程都有单独的会话连接。

标签: python-2.7 sqlalchemy twisted


【解决方案1】:

我成功解决了这个问题。我写了一个测试脚本,调用这种方式来隔离会话

这是我的测试代码:

# coding=utf-8
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.automap import automap_base
from sqlalchemy import or_

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

from config.settings import MYSQLDB_SETTING

import threading
import multiprocessing
import time


class SqlalchemyDriver(object):

    def __init__(self, host, user, password, port, database):
        db = "mysql+mysqldb://{user}:{password}@{host}:{port}/{database}?"\
            "charset=utf8".format(
                user=user,
                password=password,
                host=host,
                port=port,
                database=database
            )
        # self.engine = create_engine(db, poolclass=NullPool)
        self.engine = create_engine(db, pool_pre_ping=True, pool_size=10, max_overflow=10, pool_timeout=30)
        self.Session = scoped_session(sessionmaker(bind=self.engine, autoflush=False))


drv = SqlalchemyDriver(
    host=MYSQLDB_SETTING["host"],
    user=MYSQLDB_SETTING["user"],
    password=MYSQLDB_SETTING["password"],
    port=MYSQLDB_SETTING["port"],
    database=MYSQLDB_SETTING["database"]
    )


class MysqldbDriver(object):

    def __init__(self):
        self.classes = self.get_table_classes()
        self.session = drv.Session()
        print(id(self.session))

    def close_session(self):
        drv.Session.remove()

    def get_table_classes(self):
        base = automap_base()
        base.prepare(drv.engine, reflect=True)
        return base.classes

    def search_device_by_mac(self, mac_address):
        time.sleep(1)
        res = self.session.query(self.classes.client_management_device). \
                            filter_by(mac_address=mac_address).first()
        print(res)
        return res


def query_func():
    mysql = MysqldbDriver()
    mysql.search_device_by_mac('xx:xx:xx:xx:xx:xx')


def my_test():
    while True:
        t1 = threading.Thread(target=query_func)
        t2 = threading.Thread(target=query_func)
        t3 = threading.Thread(target=query_func)
        t4 = threading.Thread(target=query_func)
        t5 = threading.Thread(target=query_func)
        t6 = threading.Thread(target=query_func)

        t1.start()
        t2.start()
        t3.start()
        t4.start()
        t5.start()
        t6.start()
        time.sleep(3)
        t1.join()
        t2.join()
        t3.join()
        t4.join()
        t5.join()
        t6.join()
        time.sleep(1)


p1 = multiprocessing.Process(target=my_test)
p2 = multiprocessing.Process(target=my_test)
p3 = multiprocessing.Process(target=my_test)
p4 = multiprocessing.Process(target=my_test)
print('running')
p1.start()
p2.start()
p3.start()
p4.start()
time.sleep(1)
p1.join()
p2.join()
p3.join()
p4.join()
time.sleep(1)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-09-29
    • 2018-05-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多