Mr-shen

form表单及数据库表操作

1、form表单验证  wtforms

需要下载包 pip3 install wtforms

简单使用:

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms import validators
from wtforms.fields import simple
from wtforms import widgets

app = Flask(__name__, template_folder=\'templates\')
# app.debug = True

class LoginForm(Form):
    name = simple.StringField(
        # 标签名
        label=\'用户名\',
        validators=[
            # 必须要有,错误信息
            validators.DataRequired(message=\'用户名不能为空!\'),
            # 校验规则
            validators.Length(min=6, max=18, message=\'用户名长度必须大于%(min)d且小于%(max)d\')
        ],
        widget=widgets.TextInput(),  # 页面上显示的类型text或者password
        render_kw={\'class\': \'outter\'}  # 给标签设置类属性
    )
    pwd = simple.PasswordField(
        label=\'密码\',
        validators=[
            validators.DataRequired(message=\'密码不能为空!\'),
            validators.Length(min=8, message=\'密码长度必须大于%(min)d\'),
            # 可以使用正则
            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message=\'密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符\')
            ],
        widget=widgets.PasswordInput(),
        render_kw={\'class\': \'password\'}
    )

@app.route(\'/login\', methods=[\'POST\', \'GET\'])
def login():
    if request.method == \'GET\':
        form = LoginForm()
        return render_template(\'login.html\', form=form)
    else:
        form = LoginForm(formdata=request.form)
        if form.validate():
            print(\'用户提交的是:\', form.data)
        else:
            print(form.errors)
        return render_template(\'login.html\', form=form)
if __name__ == \'__main__\':
    app.run()
login.py
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
    <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>

    <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
    <input type="submit" value="提交">
</form>
</body>
</html>
login.html

常规使用:

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder=\'templates\')
app.debug = True


# 需要用什么字段导什么字段来定义
class RegisterForm(Form):
    # 自定义钩子校验,也可以用validators.EqualTo直接校验
    def validate_pwd_confirm (self, field):
        """
        自定义pwd_confirm字段规则,例:与pwd字段是否一致
        :param field:
        :return:
        """
        # 最开始初始化时,self.data中已经有所有的值

        if field.data != self.data[\'pwd\']:
            raise validators.ValidationError("密码不一致") # 继续后续验证
            #raise validators.StopValidation("密码不一致123123123")  # 不再继续后续验证

    name = simple.StringField(
        label=\'用户名\',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={\'class\': \'form-control\'},
        default=\'zack\'
    )

    pwd = simple.PasswordField(
        label=\'密码\',
        validators=[
            validators.DataRequired(message=\'密码不能为空.\')
        ],
        widget=widgets.PasswordInput(),
        render_kw={\'class\': \'form-control\'}
    )

    pwd_confirm = simple.PasswordField(
        label=\'重复密码\',
        validators=[
            #validators.DataRequired(message=\'重复密码不能为空.\'),
            validate_pwd_confirm,
            # 校验等于pwd
            validators.EqualTo(\'pwd\', message="两次密码输入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={\'class\': \'form-control\'}
    )

    email = html5.EmailField(
        label=\'邮箱\',
        validators=[
            validators.DataRequired(message=\'邮箱不能为空.\'),
            validators.Email(message=\'邮箱格式错误\')
        ],
        widget=widgets.TextInput(input_type=\'email\'),
        render_kw={\'class\': \'form-control\'}
    )

    gender = core.RadioField(
        label=\'性别\',
        choices=(
            (1, \'\'),
            (2, \'\'),
        ),
        coerce=int # “1” “2”
     )
    city = core.SelectField(
        label=\'城市\',
        choices=(
            (\'bj\', \'北京\'),
            (\'sh\', \'上海\'),
        )
    )

    hobby = core.SelectMultipleField(
        label=\'爱好\',
        choices=(
            (1, \'篮球\'),
            (2, \'足球\'),
        ),
        coerce=int
    )

    favor = core.SelectMultipleField(
        label=\'喜好\',
        choices=(
            (1, \'篮球\'),
            (2, \'足球\'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )

    # def __init__(self, *args, **kwargs):
    #     super(RegisterForm, self).__init__(*args, **kwargs)
    #     self.favor.choices = ((1, \'篮球\'), (2, \'足球\'), (3, \'羽毛球\'))




@app.route(\'/register\', methods=[\'GET\', \'POST\'])
def register():
    if request.method == \'GET\':
        form = RegisterForm()  # initial
        return render_template(\'register.html\', form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():
            print(\'用户提交数据通过格式验证,提交的值为:\', form.data)
        else:
            print(form.errors)
        return render_template(\'register.html\', form=form)



if __name__ == \'__main__\':
    app.run()
register.py
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0  50px">
    {% for field in form %}
    <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
    {% endfor %}
    <input type="submit" value="提交">
</form>
</body>
</html>
register.html

 

 2、flask的ORM  sqlalchemy

pip3 install sqlalchemy

SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果

1、ORM简单使用

1、创建和删除表

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()


class User(Base):
    __tablename__ = \'user\'  # 数据库表名
    id = Column(Integer, primary_key=True)  # 主键
    name = Column(String(32), index=True, nullable=False)  # 索引,不能为空
    age = Column(Integer)

    def __repr__(self):
        return self.name


# 根据类创建数据库表
def init_db():
    engine = create_engine(
        # 数据库名+链接数据库://root:密码@ip:port/数据库名?charset=字符集
        \'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8\',
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.create_all(engine)


# 根据类删除数据库表
def drop_db():
    engine = create_engine(
        # 数据库名+链接数据库://root:密码@ip:port/数据库名?charset=字符集
        \'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8\',
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)


if __name__ == \'__main__\':
    # drop_db()  # 删除表
    init_db()  # 创建表

2、给表添加数据

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User

# 1 连接引擎
engine = create_engine(
    \'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8\',
)
Connection = sessionmaker(bind=engine)

# 2 每次执行数据库操作时都要连接,后面一般用session=Connection()
conn = Connection()

# 1 单增
# 创建数据
obj = User(name=\'Immy\', age=18)
# 将数据添加到表中
conn.add(obj)

# 2 群增
conn.add_all([
    User(name=\'zack\', age=18),
    User(name=\'vicky\', age=20),
    # 或者其他表也行
    # Order(name=\'aaa\', price=100)
])

# 3 删除(整体删)
conn.query(User).delete()

# 4 改(传字典的形式)
# 方式1:群改
conn.query(User).update({\'name\':\'tank\', \'age\': 17})
# 方式2:群改,类似Django的f查询,加字符串的时候必需要synchronize_session=False
conn.query(User).update({User.name:User.name+\' is sb\', \'age\': 12},synchronize_session=False)
# 方式3:加数字的时候可以直接加,也可以设置synchronize_session=False
conn.query(User).update({User.age:User.age+10})

# 5 查(查不需要commit,也能拿到结果)
# 打印SQL语句
res = conn.query(User)
print(res)

# 查所有,得到列表
res = conn.query(User).all()
print(res)

# 查单条记录
res = conn.query(User).first()
print(res.age)

# 查询哪些字段,.label并将字段取别名隐藏本身名字
res = conn.query(User.age, User.name.label(\'yhm\')).first()
print(res.age, res.yhm)

# 6 过滤用filter(传表达式)或者用filter_by(传参数
res = conn.query(User).filter(User.name == \'zack\').first()
res = conn.query(User).filter_by(name=\'zack\').first()
print(res)

# 利用filter过滤修改
conn.query(User).filter(User.name == \'vicky\').update({User.name:\'wxm\', \'age\': 1})
res = conn.query(User).filter(User.name == \'wxm\').first()
print(res.age, res.name)

# 3 必须提交才能生效
conn.commit()
# 4 关闭连接,将连接放回连接池
conn.close()

3、单表查询

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User

engine = create_engine(
    \'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8\'
)
Connection = sessionmaker(bind=engine)
# 一般都用session
session = Connection()

# 1 表达式的and条件用 , 连接
ret = session.query(User).filter(User.name==\'zack\', User.age==18).first()
print(ret)

# 2 表达式的between条件
ret = session.query(User).filter(User.age.between(18, 20)).all()
print(ret)

# 3 sql查询的in_操作,相当于Django中的__in
ret = session.query(User).filter(User.id.in_([7,8,9])).all()
print(ret)

# 4 查询取反 ~
ret = session.query(User).filter(~User.id.in_([7,8])).all()
print(ret)

# 5 or查询and查询,or_,and_需要导入
from sqlalchemy import or_, and_

ret = session.query(User).filter(or_(User.id == 7, User.name == \'wxm\')).all()
res = session.query(User).filter(and_(User.id == 7, User.name == \'Immy\')).all()
print(ret, res)
# or_与and_联合使用
ret =session.query(User).filter(or_(User.id == 7,
                                    and_(User.age==18, User.name==\'zack\'))).all()
print(ret)

# 6 like查询
# 必须以I开头
ret = session.query(User).filter(User.name.like("I%")).all()
print(ret)
# 第二个字母是m,_m
ret = session.query(User).filter(User.name.like("_m%")).all()
print(ret)
# 不以I开头的,~
ret = session.query(User).filter(~User.name.like("I%")).all()
print(ret)

# 7 排序,order_by
# 降序desc
ret = session.query(User).filter(User.id>1).order_by(User.id.desc()).all()
print(ret)
# 升序,asc
ret = session.query(User).filter(User.id>1).order_by(User.id.asc()).all()
print(ret)
# 先升序再降序,用 , 隔开
# 先按年龄升序如果有相同年龄的再按id降序
ret =session.query(User).filter(User.id>=1).order_by(User.age.asc(), User.id.desc()).all()
print(ret)

# 8 分组查询
# 按照年龄分组
ret = session.query(User).group_by(User.name).all()
print(ret)
# 分组后要聚合操作需要用func
from sqlalchemy.sql import func
# 选出年龄最小大于等于18的组
ret = session.query(User).group_by(User.name).having(func.min(User.age) >= 18).all()
print(ret)
# 选出组内最小年纪大于等于18的组,查询组内的最小年龄与最大年龄的和与名字
ret = session.query(User.name,
                    func.min(User.age),
                    func.max(User.age),
                    func.sum(User.age)
                    ).group_by(User.name).having(func.min(User.age) >= 18).all()
print(ret)

2、ORM表关系

1、一对多

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship

Base = declarative_base()


class Hobby(Base):
    __tablename__ = \'hobby\'
    id = Column(Integer, primary_key=True)
    catption = Column(String(32), default=\'洗脚\')

class Person(Base):
    __tablename__ = \'person\'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32))
    # 外键hobby值tablename而不是Hobby类名,
    hobby_id = Column(Integer, ForeignKey(\'hobby.id\'))
    # 更新数据库没有关系,不会增加新字段,只能用于快速连表查询
    # relationship的第一个参数,是类名,第二个参数backref,用于反向查询
    hobby = relationship("Hobby", backref="pres")

    def __repr__(self):
        return self.name

# 根据类创建数据库表
def init_db():
    engine = create_engine(
        # 数据库名+链接数据库://root:密码@ip:port/数据库名?charset=字符集
        \'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8\',
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.create_all(engine)


# 根据类删除数据库表
def drop_db():
    engine = create_engine(
        # 数据库名+链接数据库://root:密码@ip:port/数据库名?charset=字符集
        \'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8\',
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)


if __name__ == \'__main__\':
    # drop_db()  # 删除表
    init_db()  # 创建表
models.py
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Hobby, Person

engine = create_engine(
    \'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8\'
)
Connection = sessionmaker(bind=engine)
# 一般都用session
session = Connection()

# 1 单独给两张表添加数据,不使用关联关系
session.add_all([
    Hobby(catption=\'吃鸡\'),
    Hobby(catption=\'学习\'),
    Person(name=\'vicky\', hobby_id=1),
    Person(name=\'zack\', hobby_id=2)
])

# 2 用关联关系 添加
person = Person(name=\'Mr沈\', hobby=Hobby(catption=\'学习\'))
session.add(person)

hobby = Hobby(catption=\'旅游\')
hobby.pres = [Person(name=\'Immy\'), Person(name=\'zack\')]
session.add(hobby)

# 3 正向查询
pr = session.query(Person).filter(Person.name==\'vicky\').first()
print(pr)
print(pr.hobby.catption)

# 4 反向查询
ver = session.query(Hobby).filter(Hobby.catption==\'旅游\').first()
print(ver.catption)
print(ver.pres)

# 5 如果不用relationship连表,我们自己连表查询,isouter=True表示是left join,不填默认为inner join
person_list = session.query(Hobby).join(Person, Person.hobby_id == Hobby.id, isouter=True).all()
print(person_list)

session.commit()
session.close()

2、多对多

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship

Base = declarative_base()

# 一个男孩可以喜欢多个女孩,一个女孩也可以喜欢多个男孩
class Boy2Girl(Base):
    __tablename__ = "boy2girl"
    id = Column(Integer, primary_key=True)
    girl_id = Column(Integer, ForeignKey("girl.id"))
    boy_id = Column(Integer, ForeignKey("boy.id"))


class Girl(Base):
    __tablename__ = "girl"
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)

    def __repr__(self):
        return self.name


class Boy(Base):
    __tablename__ = "boy"
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    #secondary=boy2girl 中间表的表名
    girl = relationship("Girl",secondary="boy2girl",backref = "boys")

    def __repr__(self):
        return self.name


# 根据类创建数据库表
def init_db():
    engine = create_engine(
        # 数据库名+链接数据库://root:密码@ip:port/数据库名?charset=字符集
        \'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8\',
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.create_all(engine)


# 根据类删除数据库表
def drop_db():
    engine = create_engine(
        # 数据库名+链接数据库://root:密码@ip:port/数据库名?charset=字符集
        \'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8\',
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)


if __name__ == \'__main__\':
    # drop_db()  # 删除表
    init_db()  # 创建表
models.py
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Boy, Girl, Boy2Girl

engine = create_engine(
    \'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8\'
)
Connection = sessionmaker(bind=engine)
# 一般都用session
session = Connection()

# 利用关联添加数据
boy = Boy(name=\'Zack\')
boy.girl=[Girl(name=\'vicky\'), Girl(name=\'wxm\')]
session.add(boy)
session.commit()

girl = Girl(name=\'vicky\')
girl.boys=[Boy(name=\'Mr沈\'), Boy(name=\'zack\')]
session.add(girl)
session.commit()

# 使用relationship的关系,正向查
b = session.query(Boy).filter(Boy.name == \'zack\').first()
print(b.name)
print(b.girl)

# 使用relationship的关系,反向查
g = session.query(Girl).filter(Girl.name==\'vicky\').first()
print(g.name)
print(g.boys)

3、Flask-SQLAlchemy与 flask_migrate

 flask_sqlalchemy是flask和SQLAchemy的管理者,通过他把他们做连接的ORM

要用就必须先安装。
所有的到导入都找 下面的db
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

flask_migrate数据库迁移命令

命令:manager.add_command(\'db1\', MigrateCommand)
# 1 当项目第一次执行迁移的时候。
python3 manage.py db1 init  # 只需要初始化一次

python3 manage.py db1 migrate # 等同于django的makemigrations

python3 manage.py db1 upgrade # 等同于django的migrate

 

分类:

技术点:

相关文章: