pythonwl

多app应用(了解)

### 多个app实例(启用)
from werkzeug.wsgi import DispatcherMiddleware
from werkzeug.serving import run_simple
from flask import Flask, current_app
app1 = Flask(\'app01\')
app2 = Flask(\'app02\')

@app1.route(\'/index\')
def index():
    return "app01"

@app2.route(\'/index2\')
def index2():
    return "app2"

# http://www.oldboyedu.com/index
# http://www.oldboyedu.com/sec/index2
dm = DispatcherMiddleware(app1, {
    \'/sec\': app2,
})

if __name__ == "__main__":
    run_simple(\'localhost\', 5000, dm)
    # 请求来了,会执行dm()--->__call__

flask-script(制定命令)

1 模拟出类似django的启动方式:python manage.py runserver
2 pip install flask-script
3 把excel的数据导入数据库,定制个命令,去执行(openpyxl)
	python manage.py insertdb -f xxx.excl -t aa
    
4 使用
    -方式一:python manage.py runserver
    from flask import Flask
    from flask_script import Manager
    app = Flask(__name__)
    manager=Manager(app)
    if __name__ == \'__main__\':
        manager.run()
    -方式二:自定制命令
        @manager.command
        def custom(arg):
            print(arg)
        @manager.option(\'-n\', \'--name\', dest=\'name\')
        @manager.option(\'-u\', \'--url\', dest=\'url\')
        def cmd(name, url):
            print(name, url)
5 创建超级用户
6 现在有一万条excel用户,批量导入到数据库中
	-navicate直接支持
    -脚本
    -flask-script

sqlachemy

3.0 概念

1 sqlachemy:第三方orm框架(对象关系映射)
	-go 中gorm,xorm
    -python中:django orm,sqlachemy,peewee
    https://www.cnblogs.com/liuqingzheng/articles/9006025.html
2 django orm,只能在django中用,不能单独用

3 使用 pip install sqlachemy
4 SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件

5 补充:django orm反向生成models
	-python manage.py inspectdb > app/models.py

基本使用(原生sql)

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

# 第一步生成一个engine对象
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/flask?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第二步:创建连接(执行原生sql)
conn = engine.raw_connection()
# 第三步:获取游标对象
cursor = conn.cursor()

# 第四步:具体操作
cursor.execute(\'select * from boy\')

res=cursor.fetchall()
print(res)

# 比pymysql优势在,有数据库连接池

orm使用

# 创建一个个类(继承谁?字段怎么写)
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
# 字段和字段属性
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

# 制造了一个类,作为所有模型类的基类
Base = declarative_base()

class User(Base):
    __tablename__ = \'users\'  # 数据库表名称(固定写法),如果不写,默认以类名小写作为表的名字
    id = Column(Integer, primary_key=True)  # id 主键
    # mysql中主键自动建索引:聚簇索引
    # 其他建建的索引叫:辅助索引
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    # email = Column(String(32), unique=True)  # 唯一
    # #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    # ctime = Column(DateTime, default=datetime.datetime.now) # default默认值
    # extra = Column(Text, nullable=True)

    #类似于djagno的 Meta
    # __table_args__ = (
    #     UniqueConstraint(\'id\', \'name\', name=\'uix_id_name\'), #联合唯一
    #     Index(\'ix_id_name\', \'name\', \'email\'), #索引
    # )



# 创建表
def create_table():
    # 创建engine对象
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 通过engine对象创建表
    Base.metadata.create_all(engine)

# 删除表
def drop_table():
    # 创建engine对象
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 通过engine对象删除所有表
    Base.metadata.drop_all(engine)

if __name__ == \'__main__\':
    # create_table()
    drop_table()

# 创建库?手动创建库
# 问题,sqlachemy支持修改字段吗?不支持

线程安全

#基于scoped_session实现线程安全



from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User  # pycharm报错,不会影响我们
from sqlalchemy.orm import scoped_session

# 1 制作engine
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)

# 2 制造一个 session 类(会话)
Session = sessionmaker(bind=engine)    # 得到一个类
# 3 得到一个session对象(线程安全的session)
#现在的session已经不是session对象了
#为什么线程安全,还是用的local
session = scoped_session(Session)

# session=Session()

# 4 创建一个对象
obj1 = User(name="2008")
# 5 把对象通过add放入
session.add(obj1)
# session.aaa()
# 6 提交
session.commit()
session.close()


# 类不继承Session类,但是有该类的所有方法(通过反射,一个个放进去)

# scoped_session.add------->instrument(name)--->do函数内存地址---》现在假设我要这么用:session.add()--->do()
# scoped_session.close----->instrument(name)--->do函数内存地址

基本增删查改

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User,Person,Hobby
from sqlalchemy.orm import scoped_session
from sqlalchemy.sql import text
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)

Session = sessionmaker(bind=engine)
# session = scoped_session(Session)
session=Session()

####1 新增多个对象
# obj=User(name=\'xxx\')
# obj2=User(name=\'yyyy\')
# obj3=User(name=\'zzz\')
#新增同样对象
# session.add_all([obj,obj2,obj3])
#新增不同对象
# session.add_all([Person(name=\'lqz\'),Hobby()])
####2 简单删除(查到删除)
# res=session.query(User).filter_by(name=\'2008\').delete()
# res=session.query(User).filter(User.id>=2).delete()
# # 影响1行
# print(res)

#### 3 修改
# res=session.query(User).filter_by(id=1).update({User.name:\'ccc\'})
# res=session.query(User).filter_by(id=1).update({\'name\':\'ccc\'})

# session.query(User).filter(User.id > 0).update({User.name: User.name + "099"}, synchronize_session=False) # 如果要把它转成字符串相加
# session.query(User).filter(User.id > 0).update({"age": User.age + 1}, synchronize_session="evaluate")  ## 如果要把它转成数字相加


####4 基本查询操作

# res=session.query(User).all()
# print(type(res))
# res=session.query(User).first()
# print(res)

#filter传的是表达式,filter_by传的是参数
# res=session.query(User).filter(User.id==1).all()
# res=session.query(User).filter(User.id>=1).all()
# res=session.query(User).filter(User.id<1).all()

# res=session.query(User).filter_by(name=\'ccc099\').all()


#了解
# res = session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name=\'ccc099\').all()
# print(res)


session.commit()
# 并没有真正关闭连接,而是放回池中
session.close()

高级操作

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User,Person,Hobby
from sqlalchemy.sql import text
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session=Session()


# 1 查询名字为lqz的所有user对象
# ret = session.query(User).filter_by(name=\'ccc099\').all()
# 2 表达式,and条件连接
# ret = session.query(User).filter(User.id > 1, User.name == \'egon\').all()
# 查找id在1和10之间,并且name=egon的对象
# ret = session.query(User).filter(User.id.between(1, 10), User.name == \'egon\').all()

# in条件(class_,因为这是关键字,不能直接用)
# ret = session.query(User).filter(User.id.in_([1,3,4])).all()

# 取反 ~
ret = session.query(User).filter(~User.id.in_([1,3,4])).all()

#二次筛选
# select *
# ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name=\'egon\'))).all()
# # select name,id 。。。。
# ret = session.query(User.id,User.name).filter(User.id.in_(session.query(User.id).filter_by(name=\'egon\'))).all()

\'\'\'
SELECT users.id AS users_id, users.name AS users_name 
FROM users 
WHERE users.id IN (SELECT users.id AS users_id 
FROM users 
WHERE users.name = %(name_1)s)

\'\'\'


#
from sqlalchemy import and_, or_
#or_包裹的都是or条件,and_包裹的都是and条件
#查询id>3并且name=egon的人
# ret = session.query(User).filter(and_(User.id > 3, User.name == \'egon\')).all()

# 查询id大于2或者name=ccc099的数据
# ret = session.query(User).filter(or_(User.id > 2, User.name == \'ccc099\')).all()
# ret = session.query(User).filter(
#     or_(
#         User.id < 2,
#         and_(User.name == \'egon\', User.id > 3),
#         User.extra != ""
#     )).all()
# print(ret)

\'\'\'
select *from user where id<2 or (name=egon and id >3) or extra !=\'\'
\'\'\'


# 通配符,以e开头,不以e开头
# ret = session.query(User).filter(User.name.like(\'e%\')).all()
# ret = session.query(User).filter(~User.name.like(\'e%\')).all()

# 限制,用于分页,区间 limit
# 前闭后开区间,1能取到,3取不到
ret = session.query(User)[1:3]

\'\'\'
select * from users limit 1,2;
\'\'\'


# 排序,根据name降序排列(从大到小)
# ret = session.query(User).order_by(User.name.desc()).all()
# ret = session.query(User).order_by(User.name.asc()).all()
#第一个条件降序排序后,再按第二个条件升序排
# ret = session.query(User).order_by(User.id.asc(),User.name.desc()).all()
# ret = session.query(User).order_by(User.name.desc(),User.id.asc()).all()


# 分组
from sqlalchemy.sql import func

# ret = session.query(User).group_by(User.name).all()
#分组之后取最大id,id之和,最小id
# sql 分组之后,要查询的字段只能有分组字段和聚合函数
# ret = session.query(
#     func.max(User.id),
#     func.sum(User.id),
#     func.min(User.id),
#     User.name).group_by(User.name).all()
# \'\'\'
# select max(id),sum(id),min(id) from user group by name;
#
# \'\'\'
# for obj in ret:
#     print(obj[0],\'----\',obj[1],\'-----\',obj[2],\'-----\',obj[3])
# print(ret)

#haviing筛选
# ret = session.query(
#     func.max(User.id),
#     func.sum(User.id),
#     func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all()

\'\'\'
select max(id),sum(id),min(id) from user group by name having min(id)>2;

\'\'\'
print(ret)
session.commit()

session.close()

多表操作

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User,Person,Hobby,Boy,Girl,Boy2Girl
from sqlalchemy.sql import text
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session=Session()



###  1 一对多插入数据
# obj=Hobby(caption=\'足球\')
# session.add(obj)
# p=Person(name=\'张三\',hobby_id=2)
# session.add(p)
### 2 方式二(默认情况传对象有问题)
###### Person表中要加 hobby = relationship(\'Hobby\', backref=\'pers\')
# p=Person(name=\'李四\',hobby=Hobby(caption=\'美女\'))
# 等同于
# p=Person(name=\'李四2\')
# p.hobby=Hobby(caption=\'美女2\')
# session.add(p)

## 3 方式三,通过反向操作
# hb = Hobby(caption=\'人妖\')
# hb.pers = [Person(name=\'文飞\'), Person(name=\'博雅\')]
# session.add(hb)


#### 4 查询(查询:基于连表的查询,基于对象的跨表查询)
### 4.1 基于对象的跨表查询(子查询,两次查询)
# 正查
# p=session.query(Person).filter_by(name=\'张三\').first()
# print(p)
# print(p.hobby.caption)
# 反查
# h=session.query(Hobby).filter_by(caption=\'人妖\').first()
# print(h.pers)

### 4.2 基于连表的跨表查(查一次)
# 默认根据外键连表
# isouter=True 左外连,表示Person left join Hobby,没有右连接,反过来即可
# 不写 inner join
# person_list=session.query(Person,Hobby).join(Hobby,isouter=True).all()
# print(person_list)
# print(person_list)
# for row in person_list:
#     print(row[0].name,row[1].caption)

# \'\'\'
# select * from person left join hobby on person.hobby_id=hobby.id
# \'\'\'
#
# ret = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id)
# print(ret)
# \'\'\'
# select * from user,hobby where user.id=favor.nid;
#
# \'\'\'


#join表,默认是inner join
# ret = session.query(Person).join(Hobby)
# # ret = session.query(Hobby).join(Person,isouter=True)
# \'\'\'
# SELECT *
# FROM person INNER JOIN hobby ON hobby.id = person.hobby_id
# \'\'\'
# print(ret)


# 指定连表字段(从来没用过)
# ret = session.query(Person).join(Hobby,Person.nid==Hobby.id, isouter=True)
# # ret = session.query(Person).join(Hobby,Person.hobby_id==Hobby.id, isouter=True).all()
# print(ret)
\'\'\'
SELECT *
FROM person LEFT OUTER JOIN hobby ON person.nid = hobby.id

\'\'\'

# print(ret)




# 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
# union和union all的区别?
# q1 = session.query(User.name).filter(User.id > 2)  # 6条数据
# q2 = session.query(User.name).filter(User.id < 8) # 2条数据


# q1 = session.query(User.id,User.name).filter(User.id > 2)  # 6条数据
# q2 = session.query(User.id,User.name).filter(User.id < 8) # 2条数据
# ret = q1.union_all(q2).all()
# ret1 = q1.union(q2).all()
# print(ret)
# print(ret1)
#
# q1 = session.query(User.name).filter(User.id > 2)
# q2 = session.query(Hobby.caption).filter(Hobby.nid < 2)
# ret = q1.union_all(q2).all()







#### 多对多

# session.add_all([
#     Boy(hostname=\'霍建华\'),
#     Boy(hostname=\'胡歌\'),
#     Girl(name=\'刘亦菲\'),
#     Girl(name=\'林心如\'),
# ])
# session.add_all([
#     Boy2Girl(girl_id=1, boy_id=1),
#     Boy2Girl(girl_id=2, boy_id=1)
# ])


##### 要有girls = relationship(\'Girl\', secondary=\'boy2girl\', backref=\'boys\')
# girl = Girl(name=\'张娜拉\')
# girl.boys = [Boy(hostname=\'张铁林\'),Boy(hostname=\'费玉清\')]
# session.add(girl)

# boy=Boy(hostname=\'蔡徐坤\')
# boy.girls=[Girl(name=\'谢娜\'),Girl(name=\'巧碧螺\')]
# session.add(boy)
# session.commit()


# 基于对象的跨表查

# girl=session.query(Girl).filter_by(id=3).first()
# print(girl.boys)

#### 基于连表的跨表查询

# 查询蔡徐坤约过的所有妹子
\'\'\'
select girl.name from girl,boy,Boy2Girl where boy.id=Boy2Girl.boy_id and girl.id=Boy2Girl.girl_id where boy.name=\'蔡徐坤\'

\'\'\'
# ret=session.query(Girl.name).filter(Boy.id==Boy2Girl.boy_id,Girl.id==Boy2Girl.girl_id,Boy.hostname==\'蔡徐坤\').all()

\'\'\'
select girl.name from girl inner join Boy2Girl on girl.id=Boy2Girl.girl_id inner join boy on boy.id=Boy2Girl.boy_id where boy.hostname=\'蔡徐坤\'

\'\'\'
# ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter(Boy.hostname==\'蔡徐坤\').all()
ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter_by(hostname=\'蔡徐坤\').all()
print(ret)


### 执行原生sql(用的最多的)
### django中orm如何执行原生sql
#
# cursor = session.execute(\'insert into users(name) values(:value)\',params={"value":\'xxx\'})
# print(cursor.lastrowid)
# session.commit()

session.close()

models.py




# 创建一个个类(继承谁?字段怎么写)
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
# 字段和字段属性
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship
# 制造了一个类,作为所有模型类的基类
Base = declarative_base()

class User(Base):
    __tablename__ = \'users\'  # 数据库表名称(固定写法),如果不写,默认以类名小写作为表的名字
    id = Column(Integer, primary_key=True)  # id 主键
    # mysql中主键自动建索引:聚簇索引
    # 其他建建的索引叫:辅助索引
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    # email = Column(String(32), unique=True)  # 唯一
    # #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    # ctime = Column(DateTime, default=datetime.datetime.now) # default默认值
    # extra = Column(Text, nullable=True)

    #类似于djagno的 Meta
    # __table_args__ = (
    #     UniqueConstraint(\'id\', \'name\', name=\'uix_id_name\'), #联合唯一
    #     Index(\'ix_id_name\', \'name\', \'email\'), #索引
    # )
    def __str__(self):
        return self.name
    def __repr__(self):
        # python是强类型语言
        return self.name+str(self.id)




# 一对多关系

# 一个Hobby可以有很多人喜欢
# 一个人只能由一个Hobby
class Hobby(Base):
    __tablename__ = \'hobby\'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default=\'篮球\')


class Person(Base):
    __tablename__ = \'person\'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名,uselist=False
    # 一对多的关系,关联字段写在多的一方
    hobby_id = Column(Integer, ForeignKey("hobby.id"))  # 默认可以为空

    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 类名,backref用于反向查询
    hobby = relationship(\'Hobby\', backref=\'pers\')


# 多对多关系
# 实实在在存在的表
class Boy2Girl(Base):
    __tablename__ = \'boy2girl\'
    id = Column(Integer, primary_key=True, autoincrement=True) # autoincrement自增,默认是True
    girl_id = Column(Integer, ForeignKey(\'girl.id\'))
    boy_id = Column(Integer, ForeignKey(\'boy.id\'))



class Girl(Base):
    __tablename__ = \'girl\'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)


class Boy(Base):
    __tablename__ = \'boy\'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)

    # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
    # secondary 通过哪个表建关联,跟django中的through一模一样
    girls = relationship(\'Girl\', secondary=\'boy2girl\', backref=\'boys\')

# 创建表
def create_table():
    # 创建engine对象
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 通过engine对象创建表
    Base.metadata.create_all(engine)

# 删除表
def drop_table():
    # 创建engine对象
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 通过engine对象删除所有表
    Base.metadata.drop_all(engine)

if __name__ == \'__main__\':
    create_table()  # 原来已经存在user表,再执行一次不会有问题
    # drop_table()

# 创建库?手动创建库
# 问题,sqlachemy支持修改字段吗?不支持

Flask-SQLAlchemy

flask-migrate
    -python3 manage.py db init 初始化:只执行一次
    -python3 manage.py db migrate 等同于 makemigartions
    -python3 manage.py db upgrade 等同于migrate
    
Flask-SQLAlchemy如何使用
    1 from flask_sqlalchemy import SQLAlchemy
    2 db = SQLAlchemy()
    3 db.init_app(app)
    4 以后在视图函数中使用
    	-db.session 就是咱们讲的session
        
flask-migrate的使用(表创建,字段修改)
    1 from flask_migrate import Migrate,MigrateCommand
    2 Migrate(app,db)
    3 manager.add_command(\'db\', MigrateCommand)
直接使用
    -python3 manage.py db init 初始化:只执行一次,创建migrations文件夹
    -python3 manage.py db migrate 等同于 makemigartions
    -python3 manage.py db upgrade 等同于migrate

分类:

技术点:

相关文章: