【问题标题】:Flask SQLAlchemy reflect database objects dynamicallyFlask SQLAlchemy 动态反映数据库对象
【发布时间】:2020-02-17 18:03:42
【问题描述】:

我正在编写一个连接到旧数据库的新应用程序。现在,我反映的是数据库对象,而不是在 SQLA ORM 类中手动定义它们。我已经像这样设置了我的烧瓶应用程序:

config = Config.get_config_for_env(env)
app = Flask(name)
app.config.from_object(config)
metadata = MetaData()
db = SQLAlchemy(metadata=metadata)
db.init_app(app)
app.db = db
app.app_context().push()

# Reflect DB
db.metadata.reflect(bind=db.engine, views=True)

上面的调用,反映了整个数据库。我的应用程序通常一次接触几个表,所以懒惰地反映数据库表是有意义的。可以这样做:

db.metadata.reflect(bind=db.engine, schema='MySchema', only=['MyTable'])

为此,我需要插入一个层,以确保在执行查询之前,架构和表已得到反映。这增加了复杂性,因为所有查询都必须经过另一层代码。有没有办法在查询时根据需要隐式反映数据库模式+表?

【问题讨论】:

    标签: python sqlalchemy flask-sqlalchemy


    【解决方案1】:

    好吧,如果知道你需要的表名,那么你可以这样做:

    table_to_work_with = Table("tablename", db.metadata, bind=db.engine, autoload=True)
    

    您可以使用 sqlalchemy.inspect 来获取表名,如下所述:List database tables with SQLAlchemy

    【讨论】:

      【解决方案2】:

      AFAIK,没有办法做到这一点。它可以通过测试类来完成。像这样,self.clone() 克隆一个对象:

      class TempDbApp(BaseApp):
          def __init__(self, env_src, name='TempDbApp', *args, **kwargs):
              super().__init__('t', name, *args, **kwargs)
              self.env_src = env_src
              self.logger = logging.getLogger(__name__)
              self.schemas = ['dbo']
              self.metadata = MetaData()
      
          def setup(self):
              super().setup()
              self.test_db_name = self.create_db()
      
          def teardown(self):
              # Do not drop db at end because pool
              super().teardown()
              self.metadata.drop_all(self.db.engine)
              for schema in self.schemas:
                  if schema != 'dbo':
                      self.db.engine.execute(DropSchema(schema))
              self.drop_db()
      
          def create_db(self):
              url = copy(self.db.engine.url)
              engine = create_engine(url, connect_args={'autocommit': True}, isolation_level='AUTOCOMMIT')
              res = engine.execute(f"exec dbo.usp_createShortLivedDB")
              tempdbname = res.fetchone()[0]
              res.close()
              engine.dispose()
              self.db.engine.url.database = tempdbname
              return tempdbname
      
          def drop_db(self):
              url = copy(self.db.engine.url)
              db = url.database
              url.database = 'master'
              engine = create_engine(url, connect_args={'autocommit': True}, isolation_level='AUTOCOMMIT')
      
              if database_exists(url):
                  assert db != 'master'
                  res = engine.execute(f"EXEC dbo.usp_deleteshortliveddb @dbname = '{db}'")
                  res.close()
      
          def fetch_schemas(self):
              results = self.db.engine.execute('SELECT name FROM sys.schemas')
              for schema in results:
                  self.schemas.append(schema[0])
              results.close()
      
          def create_schema(self, schema):
              with self.session() as sess:
                  sess.execute(CreateSchema(schema))
              self.schemas.append(schema)
      
          @lru_cache(maxsize=2048)
          def clone(self, table, schema):
              if schema not in self.schemas:
                  self.create_schema(schema)
      
              self.metadata.reflect(self.engine_src, schema=schema, only=[table])
              self.metadata.drop_all(self.db.engine)  # precautionary in case previous test didn't clean things up
              self.metadata.create_all(self.db.engine)
      
          @lru_cache(maxsize=2048)
          def get_table(self, table, schema, db=None):
              self.clone(table, schema)
              return super().get_table(table, schema, db)
      
          @lru_cache(maxsize=2048)
          def get_selectable(self, table, schema, db=None):
              self.clone(table, schema)
              return Table(table, self.db.metadata, schema=schema, autoload=True, autoload_with=self.db.get_engine(bind=db))
      
          @lazy
          def engine_src(self):
              conn_string = f'mssql+pymssql://user:pass@{self.env_src}-slo-sql-ds/mydb?charset=utf8'
              return create_engine(conn_string, isolation_level='AUTOCOMMIT')
      
          def start(self):
              raise Exception("You must not call this method - this app is for testing")
      

      然后可以使用多重继承创建一个测试类:

      @final
      class MyRealWorldClassTest(TempDbApp, MyRealWorldClass):
          pass
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-01-05
        • 2016-08-21
        • 1970-01-01
        • 2019-07-30
        相关资源
        最近更新 更多