【问题标题】:Using multiple POSTGRES databases and schemas with the same Flask-SQLAlchemy model使用具有相同 Flask-SQLAlchemy 模型的多个 POSTGRES 数据库和模式
【发布时间】:2019-01-09 05:04:48
【问题描述】:

我将在这里非常具体,因为已经提出了类似的问题,但没有一个解决方案适用于这个问题。

我正在从事一个有四个 postgres 数据库的项目,但为了简单起见,我们假设有 2 个。即 A 和 B

A,B代表两个地理位置,但数据库中的表和架构是相同的。

示例模型:

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base

db = SQLAlchemy()
Base = declarative_base()

class FRARecord(Base):
    __tablename__ = 'tb_fra_credentials'

    recnr = Column(db.Integer, primary_key = True)
    fra_code = Column(db.Integer)
    fra_first_name = Column(db.String)

这个模型在两个数据库中都有复制,但是使用不同的模式,所以要让它在 A 中工作,我需要这样做:

__table_args__ = {'schema' : 'A_schema'}

我想使用一个单独的内容提供者,该内容提供者可以访问数据库,但具有相同的方法:

class ContentProvider():
    def __init__(self, database):
        self.database = database

    def get_fra_list():
        logging.debug("Fetching fra list")
        fra_list = db.session.query(FRARecord.fra_code)

两个问题是,我如何决定指向哪个 db,以及如何不为不同的模式复制模型代码(这是 postgres 特有的问题)

这是我迄今为止尝试过的:

1) 我为每个模型制作了单独的文件并继承了它们,所以:

class FRARecordA(FRARecord):
    __table_args__ = {'schema' : 'A_schema'}

这似乎不起作用,因为我收到错误:

"Can't place __table_args__ on an inherited class with no table."

意味着在 db.Model(在其父级中)已经声明后我无法设置该参数

2) 所以我尝试对多重继承做同样的事情,

class FRARecord():
    recnr = Column(db.Integer, primary_key = True)
    fra_code = Column(db.Integer)
    fra_first_name = Column(db.String)

class FRARecordA(Base, FRARecord):
    __tablename__ = 'tb_fra_credentials'
    __table_args__ = {'schema' : 'A_schema'}

但得到了可预测的错误:

"CompileError: Cannot compile Column object until its 'name' is assigned."

显然我不能将 Column 对象移动到 FRARecordA 模型,而不必为 B 重复它们(实际上有 4 个数据库和更多模型)。

3) 最后,我正在考虑进行某种分片(这似乎是正确的方法),但我找不到如何进行此操作的示例。我的感觉是我只会使用这样的单个对象:

class FRARecord(Base):
    __tablename__ = 'tb_fra_credentials'

    @declared_attr
    def __table_args__(cls):
        #something where I go through the values in bind keys like
        for key, value in self.db.app.config['SQLALCHEMY_BINDS'].iteritems():
            # Return based on current session maybe? And then have different sessions in the content provider?

    recnr = Column(db.Integer, primary_key = True)
    fra_code = Column(db.Integer)
    fra_first_name = Column(db.String)

为了清楚起见,我访问不同数据库的意图如下:

app.config['SQLALCHEMY_DATABASE_URI']='postgresql://%(user)s:\
%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_A

app.config['SQLALCHEMY_BINDS']={'B':'postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_B,
                                  'C':'postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_C,
                                  'D':'postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_D
                                 }

POSTGRES 字典包含连接数据的所有键

我假设对于继承的对象,我只需像这样连接到正确的对象(这样 sqlalchemy 查询会自动知道):

class FRARecordB(FRARecord):
    __bind_key__ = 'B'
    __table_args__ = {'schema' : 'B_schema'}

【问题讨论】:

    标签: postgresql flask sqlalchemy flask-sqlalchemy sharding


    【解决方案1】:

    终于找到了解决办法。

    基本上,我没有为每个数据库创建新类,我只是为每个数据库使用不同的数据库连接。

    这种方法本身很常见,棘手的部分(我找不到示例)是处理架构差异。我最终这样做了:

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    
    Session = sessionmaker()
    
    class ContentProvider():
    
        db = None
        connection = None
        session = None
    
        def __init__(self, center):
            if center == A:
                self.db = create_engine('postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_A, echo=echo, pool_threadlocal=True)
                self.connection = self.db.connect()
                # It's not very clean, but this was the extra step. You could also set specific connection params if you have multiple schemas
                self.connection.execute('set search_path=A_schema')
            elif center == B:
                self.db = create_engine('postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_B, echo=echo, pool_threadlocal=True)
                self.connection = self.db.connect()
                self.connection.execute('set search_path=B_schema')
    
        def get_fra_list(self):
            logging.debug("Fetching fra list")
            fra_list = self.session.query(FRARecord.fra_code)
            return fra_list
    

    【讨论】:

      猜你喜欢
      • 2018-01-22
      • 2016-06-16
      • 1970-01-01
      • 2022-06-21
      • 2016-09-10
      • 1970-01-01
      • 2020-04-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多