【发布时间】:2019-07-22 12:23:33
【问题描述】:
我正在尝试构建一个服务器来采样流式价格馈送并使用 SQLAlchemy 更新 postgres 数据库。我正在使用映射类的线程实例,这似乎可以工作但不稳定。
Stream 类的 1 或 2 个实例没有问题,但如果是 10 个,线程会随机且静默地失败。每次在它失败之前,SQLAlchemy 都会给出一条错误消息,所以看起来这就是杀死线程的原因。流没有问题,它总是稳定的。
我是否错过了 SQLAlchemy 设置中的某些内容?有没有更好的方法将多个实时订阅馈送到 SQL 中?
代码:
import time
import json
from threading import Thread, Lock
import sqlalchemy as db
from sqlalchemy.orm import scoped_session, sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
# Setup SQLAlchemy
engine = db.create_engine('postgresql://localhost:5432/Project', echo=False)
metadata = db.MetaData(bind=engine)
Session = scoped_session(sessionmaker(bind=engine))
Base = declarative_base()
Base.metadata.create_all(engine)
session = Session()
#DB classes
#static data table
class StockMaster(Base):
__tablename__ = 'stock_master'
id = db.Column(db.Integer, primary_key=True)
ticker = db.Column(db.String)
stock_name = db.Column(db.String)
@classmethod
def find_by_ticker(cls,ticker):
return session.query(StockMaster).filter(StockMaster.ticker==ticker).first()
#live data table
class StockLive(Base):
__tablename__ = 'stock_live'
id = db.Column(db.Integer, primary_key=True)
quote = db.Column(db.Numeric)
timestamp = db.Column(db.Numeric)
ticker_id = db.Column(db.Integer, db.ForeignKey('stock_master.id'))
ticker = relationship("StockMaster", foreign_keys=[ticker_id])
def __init__(self, quote, ticker_id, timestamp):
self.quote=quote
self.ticker_id=ticker_id
self.timestamp=timestamp
def save_to_db(self):
session.add(self)
session.commit()
@classmethod
def find_by_ticker_id(cls,ticker_id):
return session.query(StockLive).filter(StockLive.ticker_id==ticker_id).first()
@classmethod
def find_by_ticker(cls,ticker):
ticker_id = StockMaster.find_by_ticker(ticker).id
return session.query(StockLive).filter(StockLive.ticker_id==ticker_id).first()
class Stream(Thread):
def __init__(self,ticker):
Thread.__init__(self)
self.ticker=ticker
self.quote=1
self.data_set = StockLive.find_by_ticker(self.ticker)
self.count=0
def run(self):
con.subscribe(self.ticker)
current_mid=1
while True:
new_data = json.loads(con.get_price(self.ticker).to_json())
new_mid = new_data['Mid']
if new_mid == current_mid:
pass
else:
current_mid = new_mid
self.data_set.quote = current_mid
self.data_set.timestamp = time.time()
try:
self.data_set.save_to_db()
self.count+=1
except:
self.data_set = StockLive.find_by_ticker(self.ticker)
print('error saving to db for '+self.ticker)
time.sleep(.1)
if __name__ == '__main__':
threads={}
for ticker in tickerlist:
try:
threads[ticker]=Stream(ticker)
threads[ticker].setName('Thread ' + ticker)
threads[ticker].start()
except:
print('Error setting up '+ticker)
while True:
for ticker in tickerlist:
if threads[ticker].isAlive()==False:
threads[ticker]=Stream(ticker)
SQLAlchemy 错误信息:
/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py:2323: SAWarning:当前未使用“Session.add()”操作 在刷新过程的执行阶段支持。结果可能 不一致。考虑使用替代事件侦听器或 而是连接级操作。 % 方法) /anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py:2425: SAWarning: 属性历史事件累积在 1 个以前的干净 内部刷新事件处理程序中的实例已被重置,并将 不会导致数据库更新。考虑使用 set_committed_value() 在内部刷新事件处理程序中以避免此警告。 % 长度) 线程 Thread MSFT 中的异常:回溯(最后一次调用):
文件 “/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2436 行,在 _flush transaction.commit() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 465 行,提交中 self._assert_active(prepared_ok=True) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 285 行,在 _assert_active raise sa_exc.ResourceClosedError(closed_msg) sqlalchemy.exc.ResourceClosedError: This transaction is closed在处理上述异常的过程中,又发生了一个异常:
Traceback(最近一次调用最后一次):文件 “”,第 48 行,运行中 self.data_set.save_to_db() 文件“”,第 44 行,在 save_to_db session.commit() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 954 行,提交中 self.transaction.commit() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 467 行,提交中 self._prepare_impl() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 447 行,在 _prepare_impl self.session.flush() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2313 行,齐平 self._flush(objects) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2440 行,在 _flush transaction.rollback(_capture_exception=True) 文件 "/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", 第 76 行,在 退出 compat.reraise(type_, value, traceback) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py”, 第 249 行,在再加注中 提高价值文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2440 行,在 _flush transaction.rollback(_capture_exception=True) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 483 行,在回滚中 self._assert_active(prepared_ok=True, rollback_ok=True) 文件 "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 第 285 行,在 _assert_active raise sa_exc.ResourceClosedError(closed_msg) sqlalchemy.exc.ResourceClosedError: This transaction is closed
在处理上述异常的过程中,又发生了一个异常:
Traceback(最近一次调用最后一次):文件 “/anaconda3/lib/python3.7/threading.py”,第 917 行,在 _bootstrap_inner self.run() 文件“”,第 53 行,运行中 self.data_set = StockLive.find_by_ticker(self.ccy) 文件“”,第 52 行,在 find_by_ticker ticker_id = StockMaster.find_by_ticker(ticker).id 文件“”,第 23 行,在 find_by_ticker 返回 session.query(StockMaster).filter(StockMaster.ticker==ticker).first() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py”, 第 2895 行,首先 ret = list(self[0:1]) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py”,行 2687,在 getitem 中 返回列表(res)文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py”,行 2994,在 iter self.session._autoflush() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 1493 行,在 _autoflush 中 self.flush() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2313 行,齐平 self._flush(objects) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2400 行,在 _flush subtransactions=True)文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 865 行,开始 嵌套=嵌套)文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 297 行,在 _begin self._assert_active() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 264 行,在 _assert_active “此会话处于‘准备好的’状态;没有进一步” sqlalchemy.exc.InvalidRequestError:此会话处于‘准备好的’状态 状态;此事务中不能再发出 SQL。
【问题讨论】:
-
我认为您需要更改创建引擎中的
pool设置。默认情况下它是 5(你的代码吱吱作响超过 5 个线程)所以将其更改为你的需要。
标签: python python-3.x sqlalchemy