一、介绍

SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
快速安装
pip3 install sqlalchemy

Python  Flask SQLALchemy基础知识  

组成部分:
  • Engine,框架的引擎
  • Connection Pooling ,数据库连接池
  • Dialect,选择连接数据库的DB API种类
  • Schema/Types,架构和类型
  • SQL Exprression Language,SQL表达式语言
SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

二、使用 

1. 执行原生SQL语句
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)—— -1 永不回收
)


def task(arg):
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from t1"
    )
    result = cursor.fetchall()
    cursor.close()
    conn.close()


for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)


def task(arg):
    conn = engine.contextual_connect()
    with conn:
        cur = conn.execute(
            "select * from t1"
        )
        result = cur.fetchall()
        print(result)


for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.result import ResultProxy
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)


def task(arg):
    cur = engine.execute("select * from t1")
    result = cur.fetchall()
    cur.close()
    print(result)


for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()
注意: 查看连接 show status like 'Threads%';
2. ORM
a. 创建数据库表
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 # Author:supery
 4 
 5 import datetime
 6 from sqlalchemy import create_engine
 7 from sqlalchemy.ext.declarative import declarative_base
 8 from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
 9 
10 Base = declarative_base()
11 
12 
13 class Users(Base):
14     __tablename__ = 'users'
15 
16     id = Column(Integer, primary_key=True)          # 主键
17     name = Column(String(32), index=True, nullable=False)   # 添加索引,不能为空
18     email = Column(String(32), unique=True)
19     ctime = Column(DateTime, default=datetime.datetime.now)
20     # datetime.datetime.now不能加括号,加括号默认值就是第一次插入时间
21     # extra = Column(Text, nullable=True)
22 
23     __table_args__ = (
24         UniqueConstraint('id', 'name', name='uix_id_name'),    # 联合唯一索引
25         Index('ix_id_name', 'name', 'email'),                   # 普通索引
26     )
27     _table_arhs__ = {
28         'mysql_engine': 'InnoDb',                   # 设置表引擎
29         'mysql_charset': 'utf8'                     # 设置表字符集
30     }
31 
32 
33 def init_db():
34     """
35     根据类创建数据库表
36     :return:
37     """
38     engine = create_engine(
39         "mysql+pymysql://root:123.com@10.0.0.10:3306/day123?charset=utf8",
40         max_overflow=0,  # 超过连接池大小外最多创建的连接
41         pool_size=5,  # 连接池大小
42         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
43         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)——永不回收-1
44     )
45 
46     Base.metadata.create_all(engine)
47 
48 
49 def drop_db():
50     """
51     根据类删除数据库表
52     :return:
53     """
54     engine = create_engine(
55         "mysql+pymysql://root:123.com@10.0.0.10:3306/day123?charset=utf8",
56         max_overflow=0,  # 超过连接池大小外最多创建的连接
57         pool_size=5,  # 连接池大小
58         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
59         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)——永不回收-1
60     )
61 
62     Base.metadata.drop_all(engine)
63 
64 
65 if __name__ == '__main__':
66     # 初始化数据库,先删除再创建
67     drop_db()
68     init_db()
创建单表 

相关文章: