SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

1
pip3 install sqlalchemy

Flask-SQLAlchemy常用操作

组成部分:

  • Engine,框架的引擎
  • Connection Pooling ,数据库连接池
  • Dialect,选择连接数据库的DB API种类
  • Schema/Types,架构和类型
  • SQL Exprression Language,SQL表达式语言

SQLAlchemy本身无法操作数据库,其本质上是依赖pymysql.MySQLdb,mssql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

SQLAlchemy用一个字符串表示连接信息:

'数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'

 

1
2
3
4
5
6
7
8
9
10
11
12
13
MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
    
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
    
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
    
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

底层处理

使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# auth : pangguoping
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)

# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)"
# )

# 新插入行自增ID
# cur.lastrowid

# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),]
# )


# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)",
#     host='1.1.1.99', color_id=3
# )

# 执行SQL
# cur = engine.execute('select * from hosts')
# 获取第一行数据
# cur.fetchone()
# 获取第n行数据
# cur.fetchmany(3)
# 获取所有数据
# cur.fetchall()

说白了就是使用pymysql的方法一样.

二. 使用

1. 执行原生SQL语句

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
 
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程时,最多等待的时间,超时报错,默认30秒
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置),-1代表永远不回收,即一直被重用
)
 
 
def task(arg):
    conn = engine.raw_connection()  #拿到的是一个原生的pymysql连接对象
    cursor = conn.cursor()
    cursor.execute(
        "select * from t1"
    )
    result = cursor.fetchall()
    cursor.close()
    conn.close()
 
 
for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()

  

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)


def task(arg):
    conn = engine.contextual_connect()
    with conn:
        cur = conn.execute(
            "select * from t1"
        )
        result = cur.fetchall()
        print(result)


for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()
View Code
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.result import ResultProxy
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)


def task(arg):
    cur = engine.execute("select * from t1")
    result = cur.fetchall()
    cur.close()
    print(result)


for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()
View Code

注意: 查看连接,进程cmd,mysql中>输入  show status like 'Threads%';

2. ORM

a. 创建数据库表

创建单表

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()   # 创建对象的基类:

# 定义User对象:
class Users(Base):
    # 表的名字:
    __tablename__ = 'users'

    # 表的结构:
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False,default='xx')   # index指定是否是索引,nullable是否能为空
    email = Column(String(32), unique=True)   # 指定唯一
    ctime = Column(DateTime, default=datetime.datetime.now) #注意,此处设置时datetime.datetime.now若加了括号,则时间永远是程序启动时的时间,后面创建数据时,不会变化
    extra = Column(Text, nullable=True)

    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一索引
        Index('ix_id_name', 'name', 'email'), #给name和email创建普通索引,索引名为ix_id_name
    )


def init_db():
    """
    根据类创建数据库表
    :return: 
    """
    # 初始化数据库连接:
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine) #找到所有继承了Base的类,按照其结构建表


def drop_db():
    """
    根据类删除数据库表
    :return: 
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_db()
    init_db()

  

默认建的表的引擎是MyISAM,如果要设置成InnoDB(支持事务),该怎么设置呢?

    __table_args__ = {
        'mysql_engine': 'InnoDB',   # 指定表的引擎
        'mysql_charset': 'utf8'     # 指定表的编码格式
    }

 

FK,M2M关系的创建

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime,UniqueConstraint, Index
from sqlalchemy.orm import relationship
Base = declarative_base()   # 创建对象的基类:


# ##################### 一对多示例 #########################
class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    hobby_id = Column(Integer, ForeignKey("hobby.id"))  #建FK关系

    # 与生成表结构无关,仅用于查询方便
    hobby = relationship("Hobby", backref='pers')   #反向关联的名字


# ##################### 多对多示例 #########################
# 这里多对多需要自己建第三张表,并绑定关系
class Server2Group(Base):   
    __tablename__ = 'server2group'
    id = Column(Integer, primary_key=True, autoincrement=True)  #autoincrement 设置自增
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))


class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    # 与生成表结构无关,仅用于查询方便
    servers = relationship('Server', secondary='server2group', backref='groups')    #反向关联的名字


class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)


def init_db():
    """
    根据类创建数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/userinfo?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine)


def drop_db():
    """
    根据类删除数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/userinfo?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_db()
    init_db()

 

SQLALchemy不同于Django的ORM,当创建多对多关联事,不会自动创建第三张表,需要我们自己定义关系表,进行关联

 

b. 操作数据库表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
  
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)    #创建Session类
  
# 每次执行数据库操作时,都需要创建一个session
session = Session()    # 创建session对象:
  
# ############# 执行ORM操作 #############
# 创建新User对象
obj1 = Users(name="alex1")    
# 添加到session:
session.add(obj1)
# 提交即保存到数据库:
session.commit()
# 关闭session
session.close()

 

c.通过原生SQL语句执行

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

# 查询
# cursor = session.execute('select * from users')
# result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'hc'})
# 注意占位符和传参的形式
session.commit()
print(cursor.lastrowid)

session.close()

原生SQL语句
View Code

相关文章: